This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f460ef89032 Support special path for device TTL
f460ef89032 is described below
commit f460ef890327a44862adad17f51f3be7784cb4b7
Author: 周沛辰 <[email protected]>
AuthorDate: Fri May 31 12:42:15 2024 +0800
Support special path for device TTL
---
.../iotdb/confignode/manager/TTLManager.java | 7 ++--
.../impl/DataNodeInternalRPCServiceImpl.java | 6 ++-
.../operator/schema/source/DeviceSchemaSource.java | 2 +-
.../execution/operator/source/SeriesScanUtil.java | 3 +-
.../analyze/cache/schema/DataNodeTTLCache.java | 38 ++++++++++++-------
.../config/executor/ClusterConfigTaskExecutor.java | 3 +-
.../plan/planner/OperatorTreeGenerator.java | 4 +-
.../statement/metadata/ShowDatabaseStatement.java | 4 +-
.../java/org/apache/iotdb/db/service/DataNode.java | 20 +++++++---
.../db/service/metrics/CompactionMetrics.java | 2 +-
.../iotdb/db/storageengine/StorageEngine.java | 21 +++++++----
.../db/storageengine/dataregion/DataRegion.java | 8 ++--
.../performer/impl/FastCompactionPerformer.java | 11 ++++--
.../execute/utils/MultiTsFileDeviceIterator.java | 40 ++++++++++----------
.../dataregion/memtable/TsFileProcessor.java | 22 +++++++----
.../dataregion/utils/TsFileResourceUtils.java | 6 +--
.../schema/SchemaQueryScanOperatorTest.java | 1 +
.../TsFileValidationCorrectnessTests.java | 28 ++++++++++++++
.../settle/SettleCompactionTaskTest.java | 44 +++++++++++++++++++++-
.../apache/iotdb/commons/schema/ttl/TTLCache.java | 1 +
20 files changed, 194 insertions(+), 77 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TTLManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TTLManager.java
index ec02daff250..af4fb49a0cb 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TTLManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TTLManager.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
+import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.confignode.consensus.request.read.ttl.ShowTTLPlan;
import
org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
@@ -38,7 +39,6 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
import static
org.apache.iotdb.commons.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
-import static
org.apache.tsfile.common.constant.TsFileConstant.PATH_SEPARATER_NO_REGEX;
public class TTLManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(TTLManager.class);
@@ -55,7 +55,8 @@ public class TTLManager {
}
/** Set ttl when creating database. */
- public TSStatus setTTL(DatabaseSchemaPlan databaseSchemaPlan, final boolean
isGeneratedByPipe) {
+ public TSStatus setTTL(DatabaseSchemaPlan databaseSchemaPlan, final boolean
isGeneratedByPipe)
+ throws IllegalPathException {
long ttl = databaseSchemaPlan.getSchema().getTTL();
if (ttl <= 0) {
TSStatus errorStatus = new
TSStatus(TSStatusCode.TTL_CONFIG_ERROR.getStatusCode());
@@ -68,7 +69,7 @@ public class TTLManager {
ttl = ttl <= 0 ? Long.MAX_VALUE : ttl;
SetTTLPlan setTTLPlan =
new SetTTLPlan(
-
databaseSchemaPlan.getSchema().getName().split(PATH_SEPARATER_NO_REGEX), ttl);
+
PathUtils.splitPathToDetachedNodes(databaseSchemaPlan.getSchema().getName()),
ttl);
setTTLPlan.setDataBase(true);
return configManager.getProcedureManager().setTTL(setTTLPlan,
isGeneratedByPipe);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 614956e9616..6fc2a98e6a5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -1758,7 +1758,11 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
@Override
public TSStatus setTTL(TSetTTLReq req) throws TException {
- return storageEngine.setTTL(req);
+ try {
+ return storageEngine.setTTL(req);
+ } catch (IllegalPathException e) {
+ return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
e.getMessage());
+ }
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/source/DeviceSchemaSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/source/DeviceSchemaSource.java
index 03c53d5a791..37a9d63865c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/source/DeviceSchemaSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/source/DeviceSchemaSource.java
@@ -101,7 +101,7 @@ public class DeviceSchemaSource implements
ISchemaSource<IDeviceSchemaInfo> {
.getColumnBuilder(0)
.writeBinary(new Binary(device.getFullPath(),
TSFileConfig.STRING_CHARSET));
int templateId = device.getTemplateId();
- long ttl = DataNodeTTLCache.getInstance().getTTL(device.getFullPath());
+ long ttl =
DataNodeTTLCache.getInstance().getTTL(device.getPartialPath().getNodes());
// TODO: make it more readable, like "30 days" or "10 hours"
String ttlStr = ttl == Long.MAX_VALUE ? IoTDBConstant.TTL_INFINITE :
String.valueOf(ttl);
if (hasSgCol) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
index 3f50ee552a4..e09bd364084 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
@@ -175,7 +175,8 @@ public class SeriesScanUtil implements Accountable {
this.dataSource = dataSource;
// updated filter concerning TTL
-
scanOptions.setTTL(DataNodeTTLCache.getInstance().getTTL(seriesPath.getDevice()));
+ scanOptions.setTTL(
+
DataNodeTTLCache.getInstance().getTTL(seriesPath.getDevicePath().getNodes()));
// init file index
orderUtils.setCurSeqFileIndex(dataSource);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeTTLCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeTTLCache.java
index 3b951c23a5a..07b835896d9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeTTLCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeTTLCache.java
@@ -18,14 +18,14 @@
*/
package org.apache.iotdb.db.queryengine.plan.analyze.cache.schema;
+import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.schema.ttl.TTLCache;
+import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import static
org.apache.tsfile.common.constant.TsFileConstant.PATH_SEPARATER_NO_REGEX;
-
public class DataNodeTTLCache {
private final TTLCache ttlCache;
@@ -43,57 +43,67 @@ public class DataNodeTTLCache {
private static final DataNodeTTLCache INSTANCE = new DataNodeTTLCache();
}
- public void setTTL(String path, long ttl) {
+ @TestOnly
+ public void setTTL(String path, long ttl) throws IllegalPathException {
lock.writeLock().lock();
try {
- ttlCache.setTTL(path.split(PATH_SEPARATER_NO_REGEX), ttl);
+ ttlCache.setTTL(PathUtils.splitPathToDetachedNodes(path), ttl);
} finally {
lock.writeLock().unlock();
}
}
- public void setTTL(Map<String, Long> pathTTLs) {
+ public void setTTL(String[] path, long ttl) {
lock.writeLock().lock();
try {
- pathTTLs.forEach((k, v) ->
ttlCache.setTTL(k.split(PATH_SEPARATER_NO_REGEX), v));
+ ttlCache.setTTL(path, ttl);
} finally {
lock.writeLock().unlock();
}
}
- public void unsetTTL(String path) {
+ public void unsetTTL(String[] path) {
lock.writeLock().lock();
try {
- ttlCache.unsetTTL(path.split(PATH_SEPARATER_NO_REGEX));
+ ttlCache.unsetTTL(path);
} finally {
lock.writeLock().unlock();
}
}
- public long getTTL(String path) {
+ public long getTTL(String path) throws IllegalPathException {
+ lock.readLock().lock();
+ try {
+ return ttlCache.getClosestTTL(PathUtils.splitPathToDetachedNodes(path));
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ public long getTTL(String[] path) {
lock.readLock().lock();
try {
- return ttlCache.getClosestTTL(path.split(PATH_SEPARATER_NO_REGEX));
+ return ttlCache.getClosestTTL(path);
} finally {
lock.readLock().unlock();
}
}
/** Get all ttl map under path node. */
- public Map<String, Long> getTTLUnderOneNode(String path) {
+ public Map<String, Long> getTTLUnderOneNode(String path) throws
IllegalPathException {
lock.readLock().lock();
try {
- return
ttlCache.getAllTTLUnderOneNode(path.split(PATH_SEPARATER_NO_REGEX));
+ return
ttlCache.getAllTTLUnderOneNode(PathUtils.splitPathToDetachedNodes(path));
} finally {
lock.readLock().unlock();
}
}
/** Get ttl of one specific path node. If this node does not set ttl, then
return -1. */
- public long getNodeTTL(String path) {
+ public long getNodeTTL(String path) throws IllegalPathException {
lock.readLock().lock();
try {
- return ttlCache.getLastNodeTTL(path.split(PATH_SEPARATER_NO_REGEX));
+ return ttlCache.getLastNodeTTL(PathUtils.splitPathToDetachedNodes(path));
} finally {
lock.readLock().unlock();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index d2e2c1c10b8..d0ebc933382 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -31,6 +31,7 @@ import
org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.ConfigRegionId;
+import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.executable.ExecutableManager;
@@ -355,7 +356,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
TShowDatabaseResp resp = client.showDatabase(req);
// build TSBlock
showDatabaseStatement.buildTSBlock(resp.getDatabaseInfoMap(), future);
- } catch (IOException | ClientManagerException | TException e) {
+ } catch (IOException | ClientManagerException | TException |
IllegalPathException e) {
future.setException(e);
}
return future;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index 5aad7518dca..a12a9c5face 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -2625,7 +2625,7 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
} else if (!LastQueryUtil.satisfyFilter(
updateFilterUsingTTL(
context.getGlobalTimeFilter(),
- DataNodeTTLCache.getInstance().getTTL(seriesPath.getDevice())),
+
DataNodeTTLCache.getInstance().getTTL(seriesPath.getDevicePath().getNodes())),
timeValuePair)) { // cached last value is not satisfied
if (!isFilterGtOrGe(context.getGlobalTimeFilter())) {
@@ -2836,7 +2836,7 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
} else if (!LastQueryUtil.satisfyFilter(
updateFilterUsingTTL(
context.getGlobalTimeFilter(),
- DataNodeTTLCache.getInstance().getTTL(devicePath.getFullPath())),
+ DataNodeTTLCache.getInstance().getTTL(devicePath.getNodes())),
timeValuePair)) { // cached last value is not satisfied
if (!isFilterGtOrGe(context.getGlobalTimeFilter())) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/ShowDatabaseStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/ShowDatabaseStatement.java
index f7cf5122b0a..cda2cf4af4c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/ShowDatabaseStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/ShowDatabaseStatement.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.queryengine.plan.statement.metadata;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.schema.ttl.TTLCache;
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseInfo;
@@ -76,7 +77,8 @@ public class ShowDatabaseStatement extends ShowStatement
implements IConfigState
}
public void buildTSBlock(
- Map<String, TDatabaseInfo> storageGroupInfoMap,
SettableFuture<ConfigTaskResult> future) {
+ Map<String, TDatabaseInfo> storageGroupInfoMap,
SettableFuture<ConfigTaskResult> future)
+ throws IllegalPathException {
List<TSDataType> outputDataTypes =
isDetailed
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 8b2f477fff8..f48ef5f3255 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -31,6 +31,7 @@ import
org.apache.iotdb.commons.concurrent.IoTDBDefaultThreadExceptionHandler;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
@@ -47,6 +48,7 @@ import
org.apache.iotdb.commons.udf.service.UDFClassLoaderManager;
import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
import org.apache.iotdb.commons.udf.service.UDFManagementService;
import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
@@ -363,7 +365,8 @@ public class DataNode implements DataNodeMBean {
* <p>6. All TTL information
*/
private void storeRuntimeConfigurations(
- List<TConfigNodeLocation> configNodeLocations, TRuntimeConfiguration
runtimeConfiguration) {
+ List<TConfigNodeLocation> configNodeLocations, TRuntimeConfiguration
runtimeConfiguration)
+ throws StartupException {
/* Store ConfigNodeList */
List<TEndPoint> configNodeList = new ArrayList<>();
for (TConfigNodeLocation configNodeLocation : configNodeLocations) {
@@ -965,17 +968,22 @@ public class DataNode implements DataNodeMBean {
resourcesInformationHolder.setPipePluginMetaList(list);
}
- private void initTTLInformation(byte[] allTTLInformation) {
+ private void initTTLInformation(byte[] allTTLInformation) throws
StartupException {
if (allTTLInformation == null) {
return;
}
ByteBuffer buffer = ByteBuffer.wrap(allTTLInformation);
int mapSize = ReadWriteIOUtils.readInt(buffer);
for (int i = 0; i < mapSize; i++) {
- DataNodeTTLCache.getInstance()
- .setTTL(
- Objects.requireNonNull(ReadWriteIOUtils.readString(buffer)),
- ReadWriteIOUtils.readLong(buffer));
+ try {
+ DataNodeTTLCache.getInstance()
+ .setTTL(
+ PathUtils.splitPathToDetachedNodes(
+
Objects.requireNonNull(ReadWriteIOUtils.readString(buffer))),
+ ReadWriteIOUtils.readLong(buffer));
+ } catch (IllegalPathException e) {
+ throw new StartupException(e);
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java
index 3218c403060..f3f464b20ef 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java
@@ -926,7 +926,7 @@ public class CompactionMetrics implements IMetricSet {
Metric.COMPACTION_TASK_SELECTION_COST.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
- "insertion");
+ "settle");
seqInnerSpaceCompactionTaskSelectedFileNum =
metricService.getOrCreateHistogram(
Metric.COMPACTION_TASK_SELECTED_FILE.toString(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index ef549890119..e0455b3329f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -29,12 +29,14 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.ShutdownException;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.schema.ttl.TTLCache;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.consensus.ConsensusFactory;
@@ -82,6 +84,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -764,23 +767,25 @@ public class StorageEngine implements IService {
}
/** Update ttl cache in dataNode. */
- public TSStatus setTTL(TSetTTLReq req) {
- String path = req.getPathPattern().get(0);
+ public TSStatus setTTL(TSetTTLReq req) throws IllegalPathException {
+ String[] path =
PathUtils.splitPathToDetachedNodes(req.getPathPattern().get(0));
long ttl = req.getTTL();
boolean isDataBase = req.isDataBase;
if (ttl == TTLCache.NULL_TTL) {
DataNodeTTLCache.getInstance().unsetTTL(path);
if (isDataBase) {
- DataNodeTTLCache.getInstance()
- .unsetTTL(
- path + IoTDBConstant.PATH_SEPARATOR +
IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD);
+ // unset ttl to path.**
+ String[] pathWithWildcard = Arrays.copyOf(path, path.length + 1);
+ pathWithWildcard[pathWithWildcard.length - 1] =
IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+ DataNodeTTLCache.getInstance().unsetTTL(pathWithWildcard);
}
} else {
DataNodeTTLCache.getInstance().setTTL(path, ttl);
if (isDataBase) {
- DataNodeTTLCache.getInstance()
- .setTTL(
- path + IoTDBConstant.PATH_SEPARATOR +
IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD, ttl);
+ // set ttl to path.**
+ String[] pathWithWildcard = Arrays.copyOf(path, path.length + 1);
+ pathWithWildcard[pathWithWildcard.length - 1] =
IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+ DataNodeTTLCache.getInstance().setTTL(pathWithWildcard, ttl);
}
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 70a2b308b5d..5a58b3e1dbf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -873,7 +873,7 @@ public class DataRegion implements IDataRegionForQuery {
public void insert(InsertRowNode insertRowNode) throws WriteProcessException
{
// reject insertions that are out of ttl
long deviceTTL =
-
DataNodeTTLCache.getInstance().getTTL(insertRowNode.getDevicePath().getFullPath());
+
DataNodeTTLCache.getInstance().getTTL(insertRowNode.getDevicePath().getNodes());
if (!isAlive(insertRowNode.getTime(), deviceTTL)) {
throw new OutOfTTLException(
insertRowNode.getTime(), (CommonDateTimeUtils.currentTime() -
deviceTTL));
@@ -939,7 +939,7 @@ public class DataRegion implements IDataRegionForQuery {
Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
boolean noFailure = true;
long deviceTTL =
-
DataNodeTTLCache.getInstance().getTTL(insertTabletNode.getDevicePath().getFullPath());
+
DataNodeTTLCache.getInstance().getTTL(insertTabletNode.getDevicePath().getNodes());
/*
* assume that batch has been sorted by client
@@ -3227,7 +3227,7 @@ public class DataRegion implements IDataRegionForQuery {
}
long deviceTTL =
DataNodeTTLCache.getInstance()
- .getTTL(insertRowsOfOneDeviceNode.getDevicePath().getFullPath());
+ .getTTL(insertRowsOfOneDeviceNode.getDevicePath().getNodes());
long[] costsForMetrics = new long[4];
Map<TsFileProcessor, InsertRowsNode> tsFileProcessorMap = new
HashMap<>();
for (int i = 0; i <
insertRowsOfOneDeviceNode.getInsertRowNodeList().size(); i++) {
@@ -3343,7 +3343,7 @@ public class DataRegion implements IDataRegionForQuery {
for (int i = 0; i < insertRowsNode.getInsertRowNodeList().size(); i++) {
InsertRowNode insertRowNode =
insertRowsNode.getInsertRowNodeList().get(i);
long deviceTTL =
-
DataNodeTTLCache.getInstance().getTTL(insertRowNode.getDevicePath().getFullPath());
+
DataNodeTTLCache.getInstance().getTTL(insertRowNode.getDevicePath().getNodes());
if (!isAlive(insertRowNode.getTime(), deviceTTL)) {
insertRowsNode
.getResults()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java
index 09d24e45f20..06f9570cffd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java
@@ -131,13 +131,18 @@ public class FastCompactionPerformer
sortedSourceFiles.addAll(seqFiles);
sortedSourceFiles.addAll(unseqFiles);
sortedSourceFiles.removeIf(
- x ->
- x.definitelyNotContains(device)
+ x -> {
+ try {
+ return x.definitelyNotContains(device)
|| !x.isDeviceAlive(
device,
DataNodeTTLCache.getInstance()
// TODO: remove deviceId conversion
- .getTTL(((PlainDeviceID) device).toStringID())));
+ .getTTL(((PlainDeviceID) device).toStringID()));
+ } catch (IllegalPathException e) {
+ throw new RuntimeException(e);
+ }
+ });
sortedSourceFiles.sort(Comparator.comparingLong(x ->
x.getStartTime(device)));
if (sortedSourceFiles.isEmpty()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
index c7669ed1da8..40b156058c3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
@@ -403,14 +403,14 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
List<Modification> list =
new
LinkedList<>(ModificationFile.getNormalMods(r).getModifications());
// add outdated device mods by ttl
- for (IDeviceID device : r.getDevices()) {
- // TODO: remove deviceId conversion
- long timeLowerBound =
- CommonDateTimeUtils.currentTime()
- - DataNodeTTLCache.getInstance()
- .getTTL(((PlainDeviceID) device).toStringID());
- if (r.getStartTime(device) < timeLowerBound) {
- try {
+ try {
+ for (IDeviceID device : r.getDevices()) {
+ // TODO: remove deviceId conversion
+ long timeLowerBound =
+ CommonDateTimeUtils.currentTime()
+ - DataNodeTTLCache.getInstance()
+ .getTTL(((PlainDeviceID) device).toStringID());
+ if (r.getStartTime(device) < timeLowerBound) {
list.add(
new Deletion(
CompactionPathUtils.getPath(device)
@@ -418,10 +418,10 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
Long.MAX_VALUE,
Long.MIN_VALUE,
timeLowerBound));
- } catch (IllegalPathException e) {
- throw new RuntimeException(e);
}
}
+ } catch (IllegalPathException e) {
+ throw new RuntimeException(e);
}
return list;
});
@@ -629,14 +629,14 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
List<Modification> list =
new
LinkedList<>(ModificationFile.getNormalMods(r).getModifications());
// add outdated device mods by ttl
- for (IDeviceID device : r.getDevices()) {
- // TODO: remove deviceId conversion
- long timeLowerBound =
- CommonDateTimeUtils.currentTime()
- - DataNodeTTLCache.getInstance()
- .getTTL(((PlainDeviceID)
device).toStringID());
- if (r.getStartTime(device) < timeLowerBound) {
- try {
+ try {
+ for (IDeviceID device : r.getDevices()) {
+ // TODO: remove deviceId conversion
+ long timeLowerBound =
+ CommonDateTimeUtils.currentTime()
+ - DataNodeTTLCache.getInstance()
+ .getTTL(((PlainDeviceID)
device).toStringID());
+ if (r.getStartTime(device) < timeLowerBound) {
list.add(
new Deletion(
CompactionPathUtils.getPath(device)
@@ -644,10 +644,10 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
Long.MAX_VALUE,
Long.MIN_VALUE,
timeLowerBound));
- } catch (IllegalPathException e) {
- throw new RuntimeException(e);
}
}
+ } catch (IllegalPathException e) {
+ throw new RuntimeException(e);
}
return list;
});
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 29187978710..c695b71c1cb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
+import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -1713,7 +1714,9 @@ public class TsFileProcessor {
private List<IChunkMetadata> getVisibleMetadataListFromWriterByDeviceID(
QueryContext queryContext, IDeviceID deviceID) throws
IllegalPathException {
- long timeLowerBound = getQueryTimeLowerBound(((PlainDeviceID)
deviceID).toStringID());
+ long timeLowerBound =
+ getQueryTimeLowerBound(
+ PathUtils.splitPathToDetachedNodes(((PlainDeviceID)
deviceID).toStringID()));
List<List<ChunkMetadata>> chunkMetaDataListForDevice =
writer.getVisibleMetadataList(deviceID, null);
List<ChunkMetadata> processedChunkMetadataForOneDevice = new ArrayList<>();
@@ -1732,7 +1735,8 @@ public class TsFileProcessor {
}
private List<IChunkMetadata>
getAlignedVisibleMetadataListFromWriterByDeviceID(
- QueryContext queryContext, IDeviceID deviceID) throws
QueryProcessException {
+ QueryContext queryContext, IDeviceID deviceID)
+ throws QueryProcessException, IllegalPathException {
List<AlignedChunkMetadata> alignedChunkMetadataForOneDevice = new
ArrayList<>();
List<List<Modification>> modifications = new ArrayList<>();
List<List<ChunkMetadata>> chunkMetaDataListForDevice =
@@ -1766,7 +1770,9 @@ public class TsFileProcessor {
}
}
- long timeLowerBound = getQueryTimeLowerBound(((PlainDeviceID)
deviceID).toStringID());
+ long timeLowerBound =
+ getQueryTimeLowerBound(
+ PathUtils.splitPathToDetachedNodes(((PlainDeviceID)
deviceID).toStringID()));
ModificationUtils.modifyAlignedChunkMetaData(alignedChunkMetadataForOneDevice,
modifications);
alignedChunkMetadataForOneDevice.removeIf(x -> x.getEndTime() <
timeLowerBound);
return new ArrayList<>(alignedChunkMetadataForOneDevice);
@@ -1786,7 +1792,7 @@ public class TsFileProcessor {
for (PartialPath seriesPath : pathList) {
Map<String, List<IChunkMetadata>> measurementToChunkMetaList = new
HashMap<>();
Map<String, List<IChunkHandle>> measurementToChunkHandleList = new
HashMap<>();
- long timeLowerBound = getQueryTimeLowerBound(seriesPath.getDevice());
+ long timeLowerBound =
getQueryTimeLowerBound(seriesPath.getDevicePath().getNodes());
for (IMemTable flushingMemTable : flushingMemTables) {
if (flushingMemTable.isSignalMemTable()) {
continue;
@@ -1871,7 +1877,9 @@ public class TsFileProcessor {
for (Map.Entry<IDeviceID, Boolean> entry :
devicePathToAligned.entrySet()) {
IDeviceID devicePath = entry.getKey();
boolean isAligned = entry.getValue();
- long timeLowerBound = getQueryTimeLowerBound(((PlainDeviceID)
devicePath).toStringID());
+ long timeLowerBound =
+ getQueryTimeLowerBound(
+ PathUtils.splitPathToDetachedNodes(((PlainDeviceID)
devicePath).toStringID()));
Map<String, List<IChunkMetadata>> measurementToChunkMetadataList =
new HashMap<>();
Map<String, List<IChunkHandle>> measurementToMemChunkHandleList =
new HashMap<>();
for (IMemTable flushingMemTable : flushingMemTables) {
@@ -1961,7 +1969,7 @@ public class TsFileProcessor {
try {
for (PartialPath seriesPath : seriesPaths) {
List<ReadOnlyMemChunk> readOnlyMemChunks = new ArrayList<>();
- long timeLowerBound = getQueryTimeLowerBound(seriesPath.getDevice());
+ long timeLowerBound =
getQueryTimeLowerBound(seriesPath.getDevicePath().getNodes());
for (IMemTable flushingMemTable : flushingMemTables) {
if (flushingMemTable.isSignalMemTable()) {
continue;
@@ -2022,7 +2030,7 @@ public class TsFileProcessor {
}
}
- private long getQueryTimeLowerBound(String device) {
+ private long getQueryTimeLowerBound(String[] device) {
long deviceTTL = DataNodeTTLCache.getInstance().getTTL(device);
return deviceTTL != Long.MAX_VALUE
? CommonDateTimeUtils.currentTime() - deviceTTL
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TsFileResourceUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TsFileResourceUtils.java
index 35e0a79e223..287e49a9851 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TsFileResourceUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TsFileResourceUtils.java
@@ -122,10 +122,10 @@ public class TsFileResourceUtils {
}
public static boolean validateTsFileIsComplete(TsFileResource resource) {
- if (!resource.getTsFile().exists()
- || resource.getTsFile().length()
+ if (resource.getTsFile().exists()
+ && resource.getTsFile().length()
< TSFileConfig.MAGIC_STRING.getBytes().length * 2L + Byte.BYTES) {
- // the file does not exist or file size is smaller than magic string and
version number
+ // file size is smaller than magic string and version number
logger.error(
String.format(
"target file %s is smaller than magic string and version number
size", resource));
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/schema/SchemaQueryScanOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/schema/SchemaQueryScanOperatorTest.java
index cd562cef316..bc93de0f185 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/schema/SchemaQueryScanOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/schema/SchemaQueryScanOperatorTest.java
@@ -91,6 +91,7 @@ public class SchemaQueryScanOperatorTest {
.thenReturn(META_SCAN_OPERATOR_TEST_SG + ".device0");
Mockito.when(deviceSchemaInfo.isAligned()).thenReturn(false);
Mockito.when(deviceSchemaInfo.getTemplateId()).thenReturn(-1);
+ Mockito.when(deviceSchemaInfo.getPartialPath()).thenReturn(partialPath);
operatorContext.setDriverContext(
new SchemaDriverContext(fragmentInstanceContext, schemaRegion, 0));
ISchemaSource<IDeviceSchemaInfo> deviceSchemaSource =
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/TsFileValidationCorrectnessTests.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/TsFileValidationCorrectnessTests.java
index 28a862824e0..3ef4d3353e0 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/TsFileValidationCorrectnessTests.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/TsFileValidationCorrectnessTests.java
@@ -295,4 +295,32 @@ public class TsFileValidationCorrectnessTests {
boolean success =
TsFileValidator.getInstance().validateTsFile(tsFileResource);
Assert.assertTrue(success);
}
+
+ @Test
+ public void testDeletedFile() throws IOException {
+ String path = dir + File.separator + "test12.tsfile";
+ TsFileResource tsFileResource = new TsFileResource(new File(path));
+ try (CompactionTestFileWriter writer = new
CompactionTestFileWriter(tsFileResource)) {
+ writer.startChunkGroup("d1");
+ VectorMeasurementSchema vectorMeasurementSchema =
+ new VectorMeasurementSchema(
+ "d1", new String[] {"s1"}, new TSDataType[] {TSDataType.INT32});
+ AlignedChunkWriterImpl chunkWriter = new
AlignedChunkWriterImpl(vectorMeasurementSchema);
+ chunkWriter.getTimeChunkWriter().write(1);
+ chunkWriter.getTimeChunkWriter().write(2);
+ chunkWriter.getTimeChunkWriter().write(3);
+ chunkWriter.getValueChunkWriterByIndex(0).getPageWriter().write(1, 1,
false);
+ chunkWriter.getValueChunkWriterByIndex(0).getPageWriter().write(2, 1,
false);
+ chunkWriter.getValueChunkWriterByIndex(0).getPageWriter().write(3, 1,
false);
+ chunkWriter.sealCurrentPage();
+ chunkWriter.writeToFileWriter(writer.getFileWriter());
+ writer.endChunkGroup();
+ writer.endFile();
+ }
+ tsFileResource.updateStartTime(new PlainDeviceID("d1"), 1);
+ tsFileResource.updateEndTime(new PlainDeviceID("d1"), 3);
+ tsFileResource.serialize();
+ tsFileResource.remove();
+
Assert.assertTrue(TsFileValidator.getInstance().validateTsFile(tsFileResource));
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/settle/SettleCompactionTaskTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/settle/SettleCompactionTaskTest.java
index f5bc90740f1..4b175e2077b 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/settle/SettleCompactionTaskTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/settle/SettleCompactionTaskTest.java
@@ -315,6 +315,48 @@ public class SettleCompactionTaskTest extends
AbstractCompactionTest {
validateTargetDatas(sourceDatas, Collections.emptyList());
}
+ @Test
+ public void settleWithOnlyAllDirtyFilesByTTL2()
+ throws MetadataException, IOException, WriteProcessException {
+ createFiles(6, 5, 10, 100, 0, 0, 0, 0, isAligned, true);
+ createFiles(5, 2, 3, 50, 0, 10000, 50, 50, isAligned, false);
+
+ generateTTL(5, 10);
+
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+
+ Map<PartialPath, List<TimeValuePair>> sourceDatas =
+ readSourceFiles(createTimeseries(6, 6, isAligned),
Collections.emptyList());
+
+ List<TsFileResource> selectedFiles = new ArrayList<>(seqResources);
+
+ SettleCompactionTask task =
+ new SettleCompactionTask(
+ 0, tsFileManager, Collections.emptyList(), selectedFiles, true,
getPerformer(), 0);
+ Assert.assertTrue(task.start());
+
+ selectedFiles.clear();
+ selectedFiles.addAll(unseqResources);
+ task =
+ new SettleCompactionTask(
+ 0, tsFileManager, Collections.emptyList(), selectedFiles, false,
getPerformer(), 0);
+ Assert.assertTrue(task.start());
+
+ for (TsFileResource tsFileResource : seqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
+ for (TsFileResource tsFileResource : unseqResources) {
+ Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
+ }
+
+ Assert.assertEquals(0, tsFileManager.getTsFileList(true).size());
+ Assert.assertEquals(0, tsFileManager.getTsFileList(false).size());
+
+ DataNodeTTLCache.getInstance().clearAllTTL();
+ validateTargetDatas(sourceDatas, Collections.emptyList());
+ }
+
@Test
public void settleWithOnlyPartialDirtyFilesByTTL()
throws IOException, MetadataException, WriteProcessException {
@@ -430,7 +472,7 @@ public class SettleCompactionTaskTest extends
AbstractCompactionTest {
return timeseriesPath;
}
- protected void generateTTL(int deviceNum, long ttl) {
+ protected void generateTTL(int deviceNum, long ttl) throws
IllegalPathException {
for (int dIndex = 0; dIndex < deviceNum; dIndex++) {
DataNodeTTLCache.getInstance()
.setTTL(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/ttl/TTLCache.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/ttl/TTLCache.java
index 1f079453ab3..081f009b5bb 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/ttl/TTLCache.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/ttl/TTLCache.java
@@ -34,6 +34,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+/** TTL Cache Tree, which is a prefix B+ tree with each node storing TTL. */
@NotThreadSafe
public class TTLCache {