This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
new cf4c4a28535 [to rc/1.3.3] Fix ttl bugs (#13671)
cf4c4a28535 is described below
commit cf4c4a285356bf630c008681747cf893df42f85a
Author: shuwenwei <[email protected]>
AuthorDate: Tue Oct 8 14:41:06 2024 +0800
[to rc/1.3.3] Fix ttl bugs (#13671)
* fix bugs
* add it
* fix spell
---
.../iotdb/session/it/IoTDBSessionSimpleIT.java | 36 ++++++++++++++++++++++
.../db/storageengine/dataregion/DataRegion.java | 1 +
.../performer/impl/FastCompactionPerformer.java | 33 +++++++++++---------
.../execute/utils/MultiTsFileDeviceIterator.java | 17 +++++++---
4 files changed, 69 insertions(+), 18 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSimpleIT.java
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSimpleIT.java
index c9559280ebc..3de28346d18 100644
---
a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSimpleIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSimpleIT.java
@@ -1131,6 +1131,42 @@ public class IoTDBSessionSimpleIT {
}
}
+ @Test
+ @Category({LocalStandaloneIT.class, ClusterIT.class})
+ public void insertRecordsWithExpiredDataTest()
+ throws IoTDBConnectionException, StatementExecutionException {
+ try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
+ List<Long> times = new ArrayList<>();
+ List<List<String>> measurements = new ArrayList<>();
+ List<List<TSDataType>> datatypes = new ArrayList<>();
+ List<List<Object>> values = new ArrayList<>();
+ List<String> devices = new ArrayList<>();
+
+ devices.add("root.sg.d1");
+ addLine(
+ times,
+ measurements,
+ datatypes,
+ values,
+ 3L,
+ "s1",
+ "s2",
+ TSDataType.INT32,
+ TSDataType.INT32,
+ 1,
+ 2);
+ session.executeNonQueryStatement("set ttl to root.sg.d1 1");
+ try {
+ session.insertRecords(devices, times, measurements, datatypes, values);
+ } catch (Exception e) {
+ Assert.assertTrue(e.getMessage().contains("less than ttl time bound"));
+ }
+ session.executeNonQueryStatement("unset ttl to root.sg.d1");
+ SessionDataSet dataSet = session.executeQueryStatement("select * from
root.sg.d1");
+ Assert.assertFalse(dataSet.hasNext());
+ }
+ }
+
@Test
@Category({LocalStandaloneIT.class, ClusterIT.class})
public void insertStringRecordsOfOneDeviceWithOrderTest() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 5b45eb7a853..43a9ea55210 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -3506,6 +3506,7 @@ public class DataRegion implements IDataRegionForQuery {
DateTimeUtils.convertLongToDate(insertRowNode.getTime()),
DateTimeUtils.convertLongToDate(
CommonDateTimeUtils.currentTime() -
deviceTTL))));
+
insertRowNode.setFailedMeasurementNumber(insertRowNode.getMeasurements().length);
continue;
}
// init map
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java
index c77e312a2b4..6b756f59d7f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java
@@ -21,10 +21,10 @@ package
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performe
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PatternTreeMap;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.WriteProcessException;
-import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionLastTimeCheckFailedException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.IllegalCompactionTaskSummaryException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICrossCompactionPerformer;
@@ -39,6 +39,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.wri
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.FastCrossCompactionWriter;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.FastInnerCompactionWriter;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
+import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
@@ -46,7 +47,6 @@ import
org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
import org.apache.tsfile.exception.StopReadTsFileByInterruptException;
import org.apache.tsfile.exception.write.PageException;
import org.apache.tsfile.file.metadata.IDeviceID;
-import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.read.TsFileSequenceReader;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.schema.IMeasurementSchema;
@@ -131,20 +131,25 @@ public class FastCompactionPerformer
// actually exist but the judgment return device being existed.
sortedSourceFiles.addAll(seqFiles);
sortedSourceFiles.addAll(unseqFiles);
+ long ttl = deviceIterator.getTTLForCurrentDevice();
sortedSourceFiles.removeIf(
- x -> {
- try {
- return x.definitelyNotContains(device)
- || !x.isDeviceAlive(
- device,
- DataNodeTTLCache.getInstance()
- // TODO: remove deviceId conversion
- .getTTL(((PlainDeviceID) device).toStringID()));
- } catch (IllegalPathException e) {
- throw new RuntimeException(e);
- }
- });
+ x -> x.definitelyNotContains(device) || !x.isDeviceAlive(device,
ttl));
sortedSourceFiles.sort(Comparator.comparingLong(x ->
x.getStartTime(device)));
+ if (ttl != Long.MAX_VALUE) {
+ Deletion ttlDeletion =
+ new Deletion(
+ new PartialPath(device,
IoTDBConstant.ONE_LEVEL_PATH_WILDCARD),
+ Long.MAX_VALUE,
+ Long.MIN_VALUE,
+ deviceIterator.getTimeLowerBoundForCurrentDevice());
+ for (TsFileResource sourceFile : sortedSourceFiles) {
+ modificationCache
+ .computeIfAbsent(
+ sourceFile.getTsFile().getName(),
+ k -> PatternTreeMapFactory.getModsPatternTreeMap())
+ .append(ttlDeletion.getPath(), ttlDeletion);
+ }
+ }
if (sortedSourceFiles.isEmpty()) {
// device is out of dated in all source files
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
index 15892d6bd7d..277738e2a04 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
@@ -70,6 +70,7 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
private final Map<TsFileResource, TsFileDeviceIterator> deviceIteratorMap =
new HashMap<>();
private final Map<TsFileResource, List<Modification>> modificationCache =
new HashMap<>();
private Pair<IDeviceID, Boolean> currentDevice = null;
+ private long ttlForCurrentDevice;
private long timeLowerBoundForCurrentDevice;
/**
@@ -207,13 +208,21 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
deviceIteratorMap.remove(resource);
}
- timeLowerBoundForCurrentDevice =
- CommonDateTimeUtils.currentTime()
- - DataNodeTTLCache.getInstance()
- .getTTL(((PlainDeviceID)
currentDevice.getLeft()).toStringID());
+ ttlForCurrentDevice =
+ DataNodeTTLCache.getInstance()
+ .getTTL(((PlainDeviceID) currentDevice.getLeft()).toStringID());
+ timeLowerBoundForCurrentDevice = CommonDateTimeUtils.currentTime() -
ttlForCurrentDevice;
return currentDevice;
}
+ public long getTTLForCurrentDevice() {
+ return ttlForCurrentDevice;
+ }
+
+ public long getTimeLowerBoundForCurrentDevice() {
+ return timeLowerBoundForCurrentDevice;
+ }
+
/**
* Get all measurements and schemas of the current device from source files.
Traverse all the
* files from the newest to the oldest in turn and start traversing the
index tree from the