This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch FixFileTimeIndexBug
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e9768b18175b26ea5844602f6eb95bca11199e00
Author: JackieTien97 <[email protected]>
AuthorDate: Tue Dec 17 10:02:40 2024 +0800

    Fix query scan will return duplicated timestamp or unordered timestamp 
while TsFileResource degrading
---
 .../it/env/cluster/config/MppCommonConfig.java     |   6 ++
 .../env/cluster/config/MppSharedCommonConfig.java  |   7 ++
 .../it/env/remote/config/RemoteCommonConfig.java   |   5 +
 .../org/apache/iotdb/itbase/env/CommonConfig.java  |   2 +
 .../apache/iotdb/db/it/IoTDBFileTimeIndexIT.java   | 117 +++++++++++++++++++++
 .../execution/operator/source/SeriesScanUtil.java  |  30 +++++-
 .../dataregion/read/QueryDataSource.java           |  14 ++-
 .../dataregion/tsfile/TsFileResource.java          |  12 ++-
 8 files changed, 186 insertions(+), 7 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
index 791de1ce8c4..f1117c43e57 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
@@ -508,6 +508,12 @@ public class MppCommonConfig extends MppBaseConfig 
implements CommonConfig {
     return this;
   }
 
+  @Override
+  public CommonConfig setQueryMemoryProportion(String queryMemoryProportion) {
+    setProperty("chunk_timeseriesmeta_free_memory_proportion", 
queryMemoryProportion);
+    return this;
+  }
+
   // For part of the log directory
   public String getClusterConfigStr() {
     return 
fromConsensusFullNameToAbbr(properties.getProperty(CONFIG_NODE_CONSENSUS_PROTOCOL_CLASS))
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
index 46d6751cc10..464bcae77c4 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
@@ -519,4 +519,11 @@ public class MppSharedCommonConfig implements CommonConfig 
{
     
cnConfig.setPipeConnectorRequestSliceThresholdBytes(pipeConnectorRequestSliceThresholdBytes);
     return this;
   }
+
+  @Override
+  public CommonConfig setQueryMemoryProportion(String queryMemoryProportion) {
+    dnConfig.setQueryMemoryProportion(queryMemoryProportion);
+    cnConfig.setQueryMemoryProportion(queryMemoryProportion);
+    return this;
+  }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
index 4b3994f8458..45c15e4e7a3 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
@@ -365,4 +365,9 @@ public class RemoteCommonConfig implements CommonConfig {
       int pipeConnectorRequestSliceThresholdBytes) {
     return this;
   }
+
+  @Override
+  public CommonConfig setQueryMemoryProportion(String queryMemoryProportion) {
+    return this;
+  }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index f6ad62dacff..fb8e2976817 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -161,4 +161,6 @@ public interface CommonConfig {
 
   CommonConfig setPipeConnectorRequestSliceThresholdBytes(
       int pipeConnectorRequestSliceThresholdBytes);
+
+  CommonConfig setQueryMemoryProportion(String queryMemoryProportion);
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFileTimeIndexIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFileTimeIndexIT.java
new file mode 100644
index 00000000000..a0d0a083701
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFileTimeIndexIT.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Locale;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBFileTimeIndexIT {
+
+  private static final String[] sqls =
+      new String[] {
+        "insert into root.db.d1(time,s1) values(2,2)",
+        "insert into root.db.d1(time,s1) values(3,3)",
+        "flush",
+        "insert into root.db.d2(time,s1) values(5,5)",
+        "flush",
+        "insert into root.db.d1(time,s1) values(4,4)",
+        "flush",
+        "insert into root.db.d2(time,s1) values(1,1)",
+        "insert into root.db.d1(time,s1) values(3,30)",
+        "insert into root.db.d1(time,s1) values(4,40)",
+        "flush",
+        "insert into root.db.d2(time,s1) values(2,2)",
+        "insert into root.db.d1(time,s1) values(4,400)",
+        "flush",
+      };
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    Locale.setDefault(Locale.ENGLISH);
+
+    EnvFactory.getEnv()
+        .getConfig()
+        .getCommonConfig()
+        .setDataRegionGroupExtensionPolicy("CUSTOM")
+        .setDefaultDataRegionGroupNumPerDatabase(1)
+        .setEnableSeqSpaceCompaction(false)
+        .setEnableUnseqSpaceCompaction(false)
+        .setEnableCrossSpaceCompaction(false)
+        .setQueryMemoryProportion("1:100:200:50:200:200:0:250");
+    // Adjust memstable threshold size to make it flush automatically
+    EnvFactory.getEnv().initClusterEnvironment();
+    prepareData();
+  }
+
+  private static void prepareData() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      for (String sql : sqls) {
+        statement.addBatch(sql);
+      }
+      statement.executeBatch();
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  @Test
+  public void testQuery() throws SQLException {
+    long[] time = {2L, 3L, 4L};
+    double[] value = {2.0f, 30.0f, 400.0f};
+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement();
+        ResultSet resultSet = statement.executeQuery("select s1 from 
root.db.d1")) {
+      int cnt = 0;
+      while (resultSet.next()) {
+        assertEquals(time[cnt], resultSet.getLong(1));
+        assertEquals(value[cnt], resultSet.getDouble(2), 0.00001);
+        cnt++;
+      }
+      assertEquals(time.length, cnt);
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
index c46f1e9ca5b..cf7e633ec26 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
@@ -63,6 +63,7 @@ import java.util.Comparator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.PriorityQueue;
 import java.util.function.ToLongFunction;
 
@@ -1158,17 +1159,27 @@ public class SeriesScanUtil implements Accountable {
       unpackUnseqTsFileResource();
     }
     while (orderUtils.hasNextSeqResource() && 
orderUtils.isCurSeqOverlappedWith(endpointTime)) {
-      unpackSeqTsFileResource();
+      Optional<ITimeSeriesMetadata> timeSeriesMetadata = 
unpackSeqTsFileResource();
+      // asc: if current seq tsfile's endTime >= endpointTime, we don't need 
to continue
+      // desc: if current seq tsfile's startTime <= endpointTime, we don't 
need to continue
+      if (timeSeriesMetadata.isPresent()
+          && orderUtils.overlappedSeqResourceSearchingNeedStop(
+              endpointTime, timeSeriesMetadata.get().getStatistics())) {
+        break;
+      }
     }
   }
 
-  private void unpackSeqTsFileResource() throws IOException {
+  private Optional<ITimeSeriesMetadata> unpackSeqTsFileResource() throws 
IOException {
     ITimeSeriesMetadata timeseriesMetadata =
         loadTimeSeriesMetadata(orderUtils.getNextSeqFileResource(true), true);
     // skip if data type is mismatched which may be caused by delete
     if (timeseriesMetadata != null && 
timeseriesMetadata.typeMatch(getTsDataTypeList())) {
       timeseriesMetadata.setSeq(true);
       seqTimeSeriesMetadata.add(timeseriesMetadata);
+      return Optional.of(timeseriesMetadata);
+    } else {
+      return Optional.empty();
     }
   }
 
@@ -1336,6 +1347,9 @@ public class SeriesScanUtil implements Accountable {
     TsFileResource getNextUnseqFileResource(boolean isDelete);
 
     void setCurSeqFileIndex(QueryDataSource dataSource);
+
+    boolean overlappedSeqResourceSearchingNeedStop(
+        long endPointTime, Statistics<? extends Object> currentStatistics);
   }
 
   class DescTimeOrderUtils implements TimeOrderUtils {
@@ -1454,6 +1468,12 @@ public class SeriesScanUtil implements Accountable {
     public void setCurSeqFileIndex(QueryDataSource dataSource) {
       curSeqFileIndex = dataSource.getSeqResourcesSize() - 1;
     }
+
+    @Override
+    public boolean overlappedSeqResourceSearchingNeedStop(
+        long endPointTime, Statistics<?> currentStatistics) {
+      return currentStatistics.getStartTime() <= endPointTime;
+    }
   }
 
   class AscTimeOrderUtils implements TimeOrderUtils {
@@ -1572,6 +1592,12 @@ public class SeriesScanUtil implements Accountable {
     public void setCurSeqFileIndex(QueryDataSource dataSource) {
       curSeqFileIndex = 0;
     }
+
+    @Override
+    public boolean overlappedSeqResourceSearchingNeedStop(
+        long endPointTime, Statistics<?> currentStatistics) {
+      return currentStatistics.getEndTime() >= endPointTime;
+    }
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java
index 704bdeb6902..10f843c8173 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java
@@ -45,7 +45,10 @@ public class QueryDataSource implements IQueryDataSource {
 
   private int curSeqIndex = -1;
 
-  // asc: startTime; desc: endTime
+  // asc: startTime, will be Long.MIN_VALUE if current tsfile resource is 
degraded
+  // desc: endTime, will be Long.MAX_VALUE if current tsfile resource is 
degraded
+  // if current tsfile resource is degraded, it will always be considered to 
be overlapping with
+  // current point
   private long curSeqOrderTime = 0;
 
   private Boolean curSeqSatisfied = null;
@@ -109,7 +112,7 @@ public class QueryDataSource implements IQueryDataSource {
     boolean res = ascending ? curIndex < seqResources.size() : curIndex >= 0;
     if (res && curIndex != this.curSeqIndex) {
       this.curSeqIndex = curIndex;
-      this.curSeqOrderTime = seqResources.get(curIndex).getOrderTime(deviceID, 
ascending);
+      this.curSeqOrderTime = 
seqResources.get(curIndex).getOrderTimeForSeq(deviceID, ascending);
       this.curSeqSatisfied = null;
     }
     return res;
@@ -151,7 +154,9 @@ public class QueryDataSource implements IQueryDataSource {
     if (res && curIndex != this.curUnSeqIndex) {
       this.curUnSeqIndex = curIndex;
       this.curUnSeqOrderTime =
-          
unseqResources.get(unSeqFileOrderIndex[curIndex]).getOrderTime(deviceID, 
ascending);
+          unseqResources
+              .get(unSeqFileOrderIndex[curIndex])
+              .getOrderTimeForUnseq(deviceID, ascending);
       this.curUnSeqSatisfied = null;
     }
     return res;
@@ -208,7 +213,8 @@ public class QueryDataSource implements IQueryDataSource {
     int index = 0;
     for (TsFileResource resource : unseqResources) {
       orderTimeToIndexMap
-          .computeIfAbsent(resource.getOrderTime(deviceId, ascending), key -> 
new ArrayList<>())
+          .computeIfAbsent(
+              resource.getOrderTimeForUnseq(deviceId, ascending), key -> new 
ArrayList<>())
           .add(index++);
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index 8c3d8e76a85..ed681abe8ad 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -601,7 +601,17 @@ public class TsFileResource implements PersistentResource {
     }
   }
 
-  public long getOrderTime(IDeviceID deviceId, boolean ascending) {
+  // cannot use FileTimeIndex
+  public long getOrderTimeForSeq(IDeviceID deviceId, boolean ascending) {
+    if (timeIndex instanceof ArrayDeviceTimeIndex) {
+      return ascending ? getStartTime(deviceId) : getEndTime(deviceId);
+    } else {
+      return ascending ? Long.MIN_VALUE : Long.MAX_VALUE;
+    }
+  }
+
+  // can use FileTimeIndex
+  public long getOrderTimeForUnseq(IDeviceID deviceId, boolean ascending) {
     return ascending ? getStartTime(deviceId) : getEndTime(deviceId);
   }
 

Reply via email to