This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 10611563ae [IOTDB-4448] Implement pattern based data deletion in
DataRegion to speed up deleting timeseries (#7364)
10611563ae is described below
commit 10611563ae337fc52c024e99d99e8b943082083f
Author: Marcos_Zyk <[email protected]>
AuthorDate: Thu Sep 29 13:13:42 2022 +0800
[IOTDB-4448] Implement pattern based data deletion in DataRegion to speed
up deleting timeseries (#7364)
[IOTDB-4448] Implement pattern based data deletion in DataRegion to speed
up deleting timeseries (#7364)
---
.../procedure/impl/DeleteTimeSeriesProcedure.java | 144 ++++++++++++---------
.../procedure/state/DeleteTimeSeriesState.java | 2 +-
.../org/apache/iotdb/commons/path/PartialPath.java | 18 +++
.../schemaregion/rocksdb/RSchemaRegion.java | 2 +-
.../iotdb/db/engine/memtable/AbstractMemTable.java | 79 ++++++-----
.../memtable/AlignedWritableMemChunkGroup.java | 20 ++-
.../db/engine/memtable/WritableMemChunkGroup.java | 28 ++--
.../iotdb/db/engine/storagegroup/DataRegion.java | 29 +++--
.../db/engine/storagegroup/TsFileResource.java | 9 ++
.../storagegroup/timeindex/DeviceTimeIndex.java | 27 ++++
.../storagegroup/timeindex/FileTimeIndex.java | 7 +
.../engine/storagegroup/timeindex/ITimeIndex.java | 4 +
.../storagegroup/timeindex/V012FileTimeIndex.java | 8 ++
.../db/metadata/mtree/MTreeBelowSGMemoryImpl.java | 23 +++-
.../db/metadata/schemaregion/ISchemaRegion.java | 2 +-
.../schemaregion/SchemaRegionMemoryImpl.java | 12 +-
.../schemaregion/SchemaRegionSchemaFileImpl.java | 2 +-
.../impl/DataNodeInternalRPCServiceImpl.java | 7 +-
18 files changed, 295 insertions(+), 128 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/DeleteTimeSeriesProcedure.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/DeleteTimeSeriesProcedure.java
index c9be7cd1a9..57f2b776c5 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/DeleteTimeSeriesProcedure.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/DeleteTimeSeriesProcedure.java
@@ -94,38 +94,46 @@ public class DeleteTimeSeriesProcedure
@Override
protected Flow executeFromState(ConfigNodeProcedureEnv env,
DeleteTimeSeriesState state)
throws ProcedureSuspendedException, ProcedureYieldException,
InterruptedException {
- switch (state) {
- case CONSTRUCT_BLACK_LIST:
- LOGGER.info("Construct schema black list of timeseries {}",
requestMessage);
- if (constructBlackList(env) > 0) {
- setNextState(DeleteTimeSeriesState.CLEAN_DATANODE_SCHEMA_CACHE);
+ long startTime = System.currentTimeMillis();
+ try {
+ switch (state) {
+ case CONSTRUCT_BLACK_LIST:
+ LOGGER.info("Construct schema black list of timeseries {}",
requestMessage);
+ if (constructBlackList(env) > 0) {
+ setNextState(DeleteTimeSeriesState.CLEAN_DATANODE_SCHEMA_CACHE);
+ break;
+ } else {
+ setFailure(
+ new ProcedureException(
+ new PathNotExistException(
+ patternTree.getAllPathPatterns().stream()
+ .map(PartialPath::getFullPath)
+ .collect(Collectors.toList()))));
+ return Flow.NO_MORE_STATE;
+ }
+ case CLEAN_DATANODE_SCHEMA_CACHE:
+ LOGGER.info("Invalidate cache of timeseries {}", requestMessage);
+ invalidateCache(env);
+ break;
+ case DELETE_DATA:
+ LOGGER.info("Delete data of timeseries {}", requestMessage);
+ deleteData(env);
break;
- } else {
- setFailure(
- new ProcedureException(
- new PathNotExistException(
- patternTree.getAllPathPatterns().stream()
- .map(PartialPath::getFullPath)
- .collect(Collectors.toList()))));
+ case DELETE_TIMESERIES_SCHEMA:
+ LOGGER.info("Delete timeseries schema of {}", requestMessage);
+ deleteTimeSeriesSchema(env);
return Flow.NO_MORE_STATE;
- }
- case CLEAN_DATANODE_SCHEMA_CACHE:
- LOGGER.info("Invalidate cache of timeseries {}", requestMessage);
- invalidateCache(env);
- break;
- case DELETE_DATA:
- LOGGER.info("Delete data of timeseries {}", requestMessage);
- deleteData(env);
- break;
- case DELETE_TIMESERIES:
- LOGGER.info("Delete timeseries schema of {}", requestMessage);
- deleteTimeSeries(env);
- return Flow.NO_MORE_STATE;
- default:
- setFailure(new ProcedureException("Unrecognized state " +
state.toString()));
- return Flow.NO_MORE_STATE;
+ default:
+ setFailure(new ProcedureException("Unrecognized state " +
state.toString()));
+ return Flow.NO_MORE_STATE;
+ }
+ return Flow.HAS_MORE_STATE;
+ } finally {
+ LOGGER.info(
+ String.format(
+ "DeleteTimeSeries-[%s] costs %sms",
+ state.toString(), (System.currentTimeMillis() - startTime)));
}
- return Flow.HAS_MORE_STATE;
}
// return the total num of timeseries in schema black list
@@ -195,6 +203,19 @@ public class DeleteTimeSeriesProcedure
}
private void deleteData(ConfigNodeProcedureEnv env) {
+ deleteDataWithRawPathPattern(env);
+ }
+
+ private void deleteDataWithRawPathPattern(ConfigNodeProcedureEnv env) {
+ executeDeleteData(env, patternTree);
+ if (isFailed()) {
+ return;
+ }
+ setNextState(DeleteTimeSeriesState.DELETE_TIMESERIES_SCHEMA);
+ }
+
+ // todo this will be used in IDTable scenarios
+ private void deleteDataWithResolvedPath(ConfigNodeProcedureEnv env) {
Map<TConsensusGroupId, TRegionReplicaSet> relatedSchemaRegionGroup =
env.getConfigManager().getRelatedSchemaRegionGroup(patternTree);
Map<TDataNodeLocation, List<TConsensusGroupId>>
dataNodeSchemaRegionGroupGroupIdMap =
@@ -233,39 +254,44 @@ public class DeleteTimeSeriesProcedure
continue;
}
- Map<TConsensusGroupId, TRegionReplicaSet> relatedDataRegionGroup =
- env.getConfigManager().getRelatedDataRegionGroup(patternTree);
-
- // target timeseries has no data
- if (relatedDataRegionGroup.isEmpty()) {
- continue;
- }
+ executeDeleteData(env, patternTree);
- RegionTask<TSStatus> deleteDataTask =
- new RegionTask<TSStatus>("delete data", env, relatedDataRegionGroup)
{
- @Override
- Map<Integer, TSStatus> sendRequest(
- TDataNodeLocation dataNodeLocation, List<TConsensusGroupId>
consensusGroupIdList) {
- Map<Integer, TDataNodeLocation> dataNodeLocationMap = new
HashMap<>();
- dataNodeLocationMap.put(dataNodeLocation.getDataNodeId(),
dataNodeLocation);
- DeleteDataForDeleteTimeSeriesDataNodeTask dataNodeTask =
- new
DeleteDataForDeleteTimeSeriesDataNodeTask(dataNodeLocationMap);
- AsyncDataNodeClientPool.getInstance()
- .sendAsyncRequestToDataNodeWithRetry(
- new TDeleteDataForDeleteTimeSeriesReq(
- new ArrayList<>(consensusGroupIdList),
- preparePatternTreeBytesData(patternTree)),
- dataNodeTask);
- return dataNodeTask.getDataNodeResponseMap();
- }
- };
- deleteDataTask.setExecuteOnAllReplicaset(true);
- deleteDataTask.execute();
if (isFailed()) {
return;
}
}
- setNextState(DeleteTimeSeriesState.DELETE_TIMESERIES);
+ setNextState(DeleteTimeSeriesState.DELETE_TIMESERIES_SCHEMA);
+ }
+
+ private void executeDeleteData(ConfigNodeProcedureEnv env, PathPatternTree
patternTree) {
+ Map<TConsensusGroupId, TRegionReplicaSet> relatedDataRegionGroup =
+ env.getConfigManager().getRelatedDataRegionGroup(patternTree);
+
+ // target timeseries has no data
+ if (relatedDataRegionGroup.isEmpty()) {
+ return;
+ }
+
+ RegionTask<TSStatus> deleteDataTask =
+ new RegionTask<TSStatus>("delete data", env, relatedDataRegionGroup) {
+ @Override
+ Map<Integer, TSStatus> sendRequest(
+ TDataNodeLocation dataNodeLocation, List<TConsensusGroupId>
consensusGroupIdList) {
+ Map<Integer, TDataNodeLocation> dataNodeLocationMap = new
HashMap<>();
+ dataNodeLocationMap.put(dataNodeLocation.getDataNodeId(),
dataNodeLocation);
+ DeleteDataForDeleteTimeSeriesDataNodeTask dataNodeTask =
+ new
DeleteDataForDeleteTimeSeriesDataNodeTask(dataNodeLocationMap);
+ AsyncDataNodeClientPool.getInstance()
+ .sendAsyncRequestToDataNodeWithRetry(
+ new TDeleteDataForDeleteTimeSeriesReq(
+ new ArrayList<>(consensusGroupIdList),
+ preparePatternTreeBytesData(patternTree)),
+ dataNodeTask);
+ return dataNodeTask.getDataNodeResponseMap();
+ }
+ };
+ deleteDataTask.setExecuteOnAllReplicaset(true);
+ deleteDataTask.execute();
}
private PathPatternTree fetchSchemaBlackListOnTargetDataNode(
@@ -315,10 +341,10 @@ public class DeleteTimeSeriesProcedure
return patternTree;
}
- private void deleteTimeSeries(ConfigNodeProcedureEnv env) {
+ private void deleteTimeSeriesSchema(ConfigNodeProcedureEnv env) {
RegionTask<TSStatus> deleteTimeSeriesTask =
new RegionTask<TSStatus>(
- "delete timeseries",
+ "delete timeseries schema",
env,
env.getConfigManager().getRelatedSchemaRegionGroup(patternTree)) {
@Override
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/DeleteTimeSeriesState.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/DeleteTimeSeriesState.java
index 1cf6e7e9e5..5f998f2975 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/DeleteTimeSeriesState.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/DeleteTimeSeriesState.java
@@ -23,5 +23,5 @@ public enum DeleteTimeSeriesState {
CONSTRUCT_BLACK_LIST,
CLEAN_DATANODE_SCHEMA_CACHE,
DELETE_DATA,
- DELETE_TIMESERIES
+ DELETE_TIMESERIES_SCHEMA
}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
b/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
index 3e21d0b38d..3122741e43 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
@@ -103,6 +103,15 @@ public class PartialPath extends Path implements
Comparable<Path>, Cloneable {
}
}
+ public boolean hasWildcard() {
+ for (String node : nodes) {
+ if (ONE_LEVEL_PATH_WILDCARD.equals(node) ||
MULTI_LEVEL_PATH_WILDCARD.equals(node)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
/**
* it will return a new partial path
*
@@ -617,6 +626,15 @@ public class PartialPath extends Path implements
Comparable<Path>, Cloneable {
return new PartialPath(Arrays.copyOf(nodes, nodes.length - 1));
}
+ public List<PartialPath> getDevicePathPattern() {
+ List<PartialPath> result = new ArrayList<>();
+ result.add(getDevicePath());
+ if (nodes[nodes.length - 1].equals(MULTI_LEVEL_PATH_WILDCARD)) {
+ result.add(new PartialPath(nodes));
+ }
+ return result;
+ }
+
@TestOnly
public Path toTSFilePath() {
return new Path(getDevice(), getMeasurement());
diff --git
a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
index 3575e027c9..1a5ba6715e 100644
---
a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
+++
b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
@@ -693,7 +693,7 @@ public class RSchemaRegion implements ISchemaRegion {
}
@Override
- public List<PartialPath> fetchSchemaBlackList(PathPatternTree patternTree)
+ public Set<PartialPath> fetchSchemaBlackList(PathPatternTree patternTree)
throws MetadataException {
throw new UnsupportedOperationException();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 758786b544..05938e4d74 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.engine.memtable;
+import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.flush.FlushStatus;
@@ -165,7 +166,6 @@ public abstract class AbstractMemTable implements IMemTable
{
insertRowPlan.setDeviceID(deviceIDFactory.getDeviceID(insertRowPlan.getDevicePath()));
}
- updatePlanIndexes(insertRowPlan.getIndex());
String[] measurements = insertRowPlan.getMeasurements();
Object[] values = insertRowPlan.getValues();
@@ -212,12 +212,9 @@ public abstract class AbstractMemTable implements
IMemTable {
// if this insert plan isn't from storage engine (mainly from test), we
should set a temp device
// id for it
if (insertRowNode.getDeviceID() == null) {
- insertRowNode.setDeviceID(
-
DeviceIDFactory.getInstance().getDeviceID(insertRowNode.getDevicePath()));
+
insertRowNode.setDeviceID(deviceIDFactory.getDeviceID(insertRowNode.getDevicePath()));
}
- // updatePlanIndexes(insertRowNode.getIndex());
- updatePlanIndexes(0);
String[] measurements = insertRowNode.getMeasurements();
Object[] values = insertRowNode.getValues();
@@ -266,7 +263,6 @@ public abstract class AbstractMemTable implements IMemTable
{
insertRowPlan.setDeviceID(deviceIDFactory.getDeviceID(insertRowPlan.getDevicePath()));
}
- updatePlanIndexes(insertRowPlan.getIndex());
String[] measurements = insertRowPlan.getMeasurements();
Object[] values = insertRowPlan.getValues();
@@ -307,8 +303,6 @@ public abstract class AbstractMemTable implements IMemTable
{
insertRowNode.setDeviceID(deviceIDFactory.getDeviceID(insertRowNode.getDevicePath()));
}
- // TODO updatePlanIndexes(insertRowNode.getIndex());
- updatePlanIndexes(0);
String[] measurements = insertRowNode.getMeasurements();
Object[] values = insertRowNode.getValues();
List<IMeasurementSchema> schemaList = new ArrayList<>();
@@ -344,7 +338,6 @@ public abstract class AbstractMemTable implements IMemTable
{
@Override
public void insertTablet(InsertTabletPlan insertTabletPlan, int start, int
end)
throws WriteProcessException {
- updatePlanIndexes(insertTabletPlan.getIndex());
try {
write(insertTabletPlan, start, end);
memSize += MemUtils.getTabletSize(insertTabletPlan, start, end,
disableMemControl);
@@ -367,7 +360,6 @@ public abstract class AbstractMemTable implements IMemTable
{
@Override
public void insertAlignedTablet(InsertTabletPlan insertTabletPlan, int
start, int end)
throws WriteProcessException {
- updatePlanIndexes(insertTabletPlan.getIndex());
try {
writeAlignedTablet(insertTabletPlan, start, end);
memSize += MemUtils.getAlignedTabletSize(insertTabletPlan, start, end,
disableMemControl);
@@ -390,9 +382,6 @@ public abstract class AbstractMemTable implements IMemTable
{
@Override
public void insertTablet(InsertTabletNode insertTabletNode, int start, int
end)
throws WriteProcessException {
- // TODO: PlanIndex
- // updatePlanIndexes(insertTabletPlan.getIndex());
- updatePlanIndexes(0);
try {
write(insertTabletNode, start, end);
memSize += MemUtils.getTabletSize(insertTabletNode, start, end,
disableMemControl);
@@ -415,9 +404,6 @@ public abstract class AbstractMemTable implements IMemTable
{
@Override
public void insertAlignedTablet(InsertTabletNode insertTabletNode, int
start, int end)
throws WriteProcessException {
- // TODO: PlanIndex
- // updatePlanIndexes(insertTabletPlan.getIndex());
- updatePlanIndexes(0);
try {
writeAlignedTablet(insertTabletNode, start, end);
memSize += MemUtils.getAlignedTabletSize(insertTabletNode, start, end,
disableMemControl);
@@ -495,8 +481,7 @@ public abstract class AbstractMemTable implements IMemTable
{
public void write(InsertTabletNode insertTabletNode, int start, int end) {
// if this insert plan isn't from storage engine, we should set a temp
device id for it
if (insertTabletNode.getDeviceID() == null) {
- insertTabletNode.setDeviceID(
-
DeviceIDFactory.getInstance().getDeviceID(insertTabletNode.getDevicePath()));
+
insertTabletNode.setDeviceID(deviceIDFactory.getDeviceID(insertTabletNode.getDevicePath()));
}
List<IMeasurementSchema> schemaList = new ArrayList<>();
@@ -554,8 +539,7 @@ public abstract class AbstractMemTable implements IMemTable
{
public void writeAlignedTablet(InsertTabletNode insertTabletNode, int start,
int end) {
// if this insert plan isn't from storage engine, we should set a temp
device id for it
if (insertTabletNode.getDeviceID() == null) {
- insertTabletNode.setDeviceID(
-
DeviceIDFactory.getInstance().getDeviceID(insertTabletNode.getDevicePath()));
+
insertTabletNode.setDeviceID(deviceIDFactory.getDeviceID(insertTabletNode.getDevicePath()));
}
List<IMeasurementSchema> schemaList = new ArrayList<>();
@@ -654,17 +638,55 @@ public abstract class AbstractMemTable implements
IMemTable {
.getReadOnlyMemChunkFromMemTable(this, modsToMemtable, ttlLowerBound);
}
+ /**
+ * @param originalPath the original path pattern or full path to be used to
match timeseries, e.g.
+ * root.sg.**, root.sg.*.s, root.sg.d.s
+ * @param devicePath one of the device path patterns generated by original
path, e.g. given
+ * original path root.sg.** and the device path may be root.sg or
root.sg.**
+ * @param startTimestamp the lower-bound of deletion time.
+ * @param endTimestamp the upper-bound of deletion time
+ */
@SuppressWarnings("squid:S3776") // high Cognitive Complexity
@Override
public void delete(
PartialPath originalPath, PartialPath devicePath, long startTimestamp,
long endTimestamp) {
- IWritableMemChunkGroup memChunkGroup =
memTableMap.get(getDeviceID(devicePath));
- if (memChunkGroup == null) {
- return;
+ if (devicePath.hasWildcard()) {
+ // In cluster mode without IDTable, the input devicePath may be a
devicePathPattern
+ List<Pair<PartialPath, IWritableMemChunkGroup>> targetDeviceList = new
ArrayList<>();
+ for (Entry<IDeviceID, IWritableMemChunkGroup> entry :
memTableMap.entrySet()) {
+ try {
+ PartialPath devicePathInMemTable = new
PartialPath(entry.getKey().toStringID());
+ if (devicePath.matchFullPath(devicePathInMemTable)) {
+ targetDeviceList.add(new Pair<>(devicePathInMemTable,
entry.getValue()));
+ }
+ } catch (IllegalPathException e) {
+ // won't reach here
+ }
+ }
+
+ for (Pair<PartialPath, IWritableMemChunkGroup> targetDevice :
targetDeviceList) {
+ deleteDataInChunkGroup(
+ targetDevice.right, originalPath, targetDevice.left,
startTimestamp, endTimestamp);
+ }
+ } else {
+ IWritableMemChunkGroup memChunkGroup =
+ memTableMap.get(deviceIDFactory.getDeviceID(devicePath));
+ if (memChunkGroup == null) {
+ return;
+ }
+ deleteDataInChunkGroup(memChunkGroup, originalPath, devicePath,
startTimestamp, endTimestamp);
}
+ }
+
+ private void deleteDataInChunkGroup(
+ IWritableMemChunkGroup memChunkGroup,
+ PartialPath originalPath,
+ PartialPath devicePath,
+ long startTimestamp,
+ long endTimestamp) {
totalPointsNum -= memChunkGroup.delete(originalPath, devicePath,
startTimestamp, endTimestamp);
if (memChunkGroup.getMemChunkMap().isEmpty()) {
- memTableMap.remove(getDeviceID(devicePath));
+ memTableMap.remove(deviceIDFactory.getDeviceID(devicePath));
}
}
@@ -720,11 +742,6 @@ public abstract class AbstractMemTable implements
IMemTable {
return minPlanIndex;
}
- void updatePlanIndexes(long index) {
- maxPlanIndex = Math.max(index, maxPlanIndex);
- minPlanIndex = Math.min(index, minPlanIndex);
- }
-
@Override
public long getMemTableId() {
return memTableId;
@@ -745,10 +762,6 @@ public abstract class AbstractMemTable implements
IMemTable {
this.flushStatus = flushStatus;
}
- private IDeviceID getDeviceID(PartialPath deviceId) {
- return deviceIDFactory.getDeviceID(deviceId);
- }
-
/** Notice: this method is concurrent unsafe */
@Override
public int serializedSize() {
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunkGroup.java
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunkGroup.java
index 6a3084835c..25856adf27 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunkGroup.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunkGroup.java
@@ -34,6 +34,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import static
org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+import static
org.apache.iotdb.commons.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
+
public class AlignedWritableMemChunkGroup implements IWritableMemChunkGroup {
private AlignedWritableMemChunk memChunk;
@@ -99,9 +102,10 @@ public class AlignedWritableMemChunkGroup implements
IWritableMemChunkGroup {
int deletedPointsNumber = 0;
Set<String> measurements = memChunk.getAllMeasurements();
List<String> columnsToBeRemoved = new ArrayList<>();
- for (String measurement : measurements) {
- PartialPath fullPath = devicePath.concatNode(measurement);
- if (originalPath.matchFullPath(fullPath)) {
+ String targetMeasurement = originalPath.getMeasurement();
+ if (targetMeasurement.equals(ONE_LEVEL_PATH_WILDCARD)
+ || targetMeasurement.equals(MULTI_LEVEL_PATH_WILDCARD)) {
+ for (String measurement : measurements) {
Pair<Integer, Boolean> deleteInfo =
memChunk.deleteDataFromAColumn(startTimestamp, endTimestamp,
measurement);
deletedPointsNumber += deleteInfo.left;
@@ -109,7 +113,17 @@ public class AlignedWritableMemChunkGroup implements
IWritableMemChunkGroup {
columnsToBeRemoved.add(measurement);
}
}
+ } else {
+ if (measurements.contains(targetMeasurement)) {
+ Pair<Integer, Boolean> deleteInfo =
+ memChunk.deleteDataFromAColumn(startTimestamp, endTimestamp,
targetMeasurement);
+ deletedPointsNumber += deleteInfo.left;
+ if (Boolean.TRUE.equals(deleteInfo.right)) {
+ columnsToBeRemoved.add(targetMeasurement);
+ }
+ }
}
+
for (String columnToBeRemoved : columnsToBeRemoved) {
memChunk.removeColumn(columnToBeRemoved);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkGroup.java
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkGroup.java
index 75e24d4fb6..128a3d6403 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkGroup.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkGroup.java
@@ -34,6 +34,9 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import static
org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+import static
org.apache.iotdb.commons.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
+
public class WritableMemChunkGroup implements IWritableMemChunkGroup {
private Map<String, IWritableMemChunk> memChunkMap;
@@ -117,21 +120,28 @@ public class WritableMemChunkGroup implements
IWritableMemChunkGroup {
public int delete(
PartialPath originalPath, PartialPath devicePath, long startTimestamp,
long endTimestamp) {
int deletedPointsNumber = 0;
- Iterator<Entry<String, IWritableMemChunk>> iter =
memChunkMap.entrySet().iterator();
- while (iter.hasNext()) {
- Entry<String, IWritableMemChunk> entry = iter.next();
- IWritableMemChunk chunk = entry.getValue();
- // the key is measurement rather than component of multiMeasurement
- PartialPath fullPath = devicePath.concatNode(entry.getKey());
- if (originalPath.matchFullPath(fullPath)) {
- // matchFullPath ensures this branch could work on delete data of
unary or multi measurement
- // and delete timeseries
+ String targetMeasurement = originalPath.getMeasurement();
+ if (targetMeasurement.equals(ONE_LEVEL_PATH_WILDCARD)
+ || targetMeasurement.equals(MULTI_LEVEL_PATH_WILDCARD)) {
+ Iterator<Entry<String, IWritableMemChunk>> iter =
memChunkMap.entrySet().iterator();
+ while (iter.hasNext()) {
+ Entry<String, IWritableMemChunk> entry = iter.next();
+ IWritableMemChunk chunk = entry.getValue();
if (startTimestamp == Long.MIN_VALUE && endTimestamp ==
Long.MAX_VALUE) {
iter.remove();
}
deletedPointsNumber += chunk.delete(startTimestamp, endTimestamp);
}
+ } else {
+ IWritableMemChunk chunk = memChunkMap.get(targetMeasurement);
+ if (chunk != null) {
+ if (startTimestamp == Long.MIN_VALUE && endTimestamp ==
Long.MAX_VALUE) {
+ memChunkMap.remove(targetMeasurement);
+ }
+ deletedPointsNumber += chunk.delete(startTimestamp, endTimestamp);
+ }
}
+
return deletedPointsNumber;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index 37395e1207..1543c18ec9 100755
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -122,6 +122,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -2212,8 +2213,7 @@ public class DataRegion {
try {
- PartialPath devicePath = pattern.getDevicePath();
- Set<PartialPath> devicePaths = Collections.singleton(devicePath);
+ Set<PartialPath> devicePaths = new
HashSet<>(pattern.getDevicePathPattern());
// delete Last cache record if necessary
// todo implement more precise process
@@ -2337,21 +2337,32 @@ public class DataRegion {
}
for (PartialPath device : devicePaths) {
- String deviceId = device.getFullPath();
- if (!tsFileResource.mayContainsDevice(deviceId)) {
- // resource does not contain this device
- continue;
+ long deviceStartTime, deviceEndTime;
+ if (device.hasWildcard()) {
+ Pair<Long, Long> startAndEndTime =
tsFileResource.getPossibleStartTimeAndEndTime(device);
+ if (startAndEndTime == null) {
+ continue;
+ }
+ deviceStartTime = startAndEndTime.getLeft();
+ deviceEndTime = startAndEndTime.getRight();
+ } else {
+ String deviceId = device.getFullPath();
+ if (!tsFileResource.mayContainsDevice(deviceId)) {
+ // resource does not contain this device
+ continue;
+ }
+ deviceStartTime = tsFileResource.getStartTime(deviceId);
+ deviceEndTime = tsFileResource.getEndTime(deviceId);
}
- long deviceEndTime = tsFileResource.getEndTime(deviceId);
if (!tsFileResource.isClosed() && deviceEndTime == Long.MIN_VALUE) {
// unsealed seq file
- if (deleteEnd >= tsFileResource.getStartTime(deviceId)) {
+ if (deleteEnd >= deviceStartTime) {
return false;
}
} else {
// sealed file or unsealed unseq file
- if (deleteEnd >= tsFileResource.getStartTime(deviceId) && deleteStart
<= deviceEndTime) {
+ if (deleteEnd >= deviceStartTime && deleteStart <= deviceEndTime) {
// time range of device has overlap with the deletion
return false;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 050f50cf92..e67203d242 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -43,6 +43,7 @@ import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.FilePathUtils;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
@@ -428,6 +429,14 @@ public class TsFileResource {
return timeIndex.mayContainsDevice(device);
}
+ /**
+ * Get the min start time and max end time of devices matched by given
devicePattern. If there's
+ * no device matched by given pattern, return null.
+ */
+ public Pair<Long, Long> getPossibleStartTimeAndEndTime(PartialPath
devicePattern) {
+ return timeIndex.getPossibleStartTimeAndEndTime(devicePattern);
+ }
+
public boolean isClosed() {
return this.status != TsFileResourceStatus.UNCLOSED;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
index 5d75391cea..9e7daf659c 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
@@ -19,11 +19,14 @@
package org.apache.iotdb.db.engine.storagegroup.timeindex;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.SerializeUtils;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.PartitionViolationException;
import org.apache.iotdb.tsfile.utils.FilePathUtils;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.RamUsageEstimator;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -337,4 +340,28 @@ public class DeviceTimeIndex implements ITimeIndex {
public boolean mayContainsDevice(String device) {
return deviceToIndex.containsKey(device);
}
+
+ @Override
+ public Pair<Long, Long> getPossibleStartTimeAndEndTime(PartialPath
devicePattern) {
+ boolean hasMatchedDevice = false;
+ long startTime = Long.MAX_VALUE;
+ long endTime = Long.MIN_VALUE;
+ for (Entry<String, Integer> entry : deviceToIndex.entrySet()) {
+ try {
+ if (devicePattern.matchFullPath(new PartialPath(entry.getKey()))) {
+ hasMatchedDevice = true;
+ if (startTimes[entry.getValue()] < startTime) {
+ startTime = startTimes[entry.getValue()];
+ }
+ if (endTimes[entry.getValue()] > endTime) {
+ endTime = endTimes[entry.getValue()];
+ }
+ }
+ } catch (IllegalPathException e) {
+ // won't reach here
+ }
+ }
+
+ return hasMatchedDevice ? new Pair<>(startTime, endTime) : null;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java
index a1e9827855..9129ce240d 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java
@@ -19,12 +19,14 @@
package org.apache.iotdb.db.engine.storagegroup.timeindex;
+import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.PartitionViolationException;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.utils.FilePathUtils;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.RamUsageEstimator;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -224,4 +226,9 @@ public class FileTimeIndex implements ITimeIndex {
public boolean mayContainsDevice(String device) {
return true;
}
+
+ @Override
+ public Pair<Long, Long> getPossibleStartTimeAndEndTime(PartialPath
devicePattern) {
+ return new Pair<>(startTime, endTime);
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
index b4e127bd3b..e3dd93685b 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
@@ -19,8 +19,10 @@
package org.apache.iotdb.db.engine.storagegroup.timeindex;
+import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.PartitionViolationException;
+import org.apache.iotdb.tsfile.utils.Pair;
import java.io.IOException;
import java.io.InputStream;
@@ -186,4 +188,6 @@ public interface ITimeIndex {
* it may or may not contain this device
*/
boolean mayContainsDevice(String device);
+
+ Pair<Long, Long> getPossibleStartTimeAndEndTime(PartialPath devicePattern);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/V012FileTimeIndex.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/V012FileTimeIndex.java
index 03abe86fa6..9443f8c977 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/V012FileTimeIndex.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/V012FileTimeIndex.java
@@ -19,9 +19,11 @@
package org.apache.iotdb.db.engine.storagegroup.timeindex;
+import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.SerializeUtils;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.PartitionViolationException;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.IOException;
@@ -173,4 +175,10 @@ public class V012FileTimeIndex implements ITimeIndex {
throw new UnsupportedOperationException(
"V012FileTimeIndex should be rewritten while upgrading and
containsDevice() method should not be called any more.");
}
+
+ @Override
+ public Pair<Long, Long> getPossibleStartTimeAndEndTime(PartialPath
devicePattern) {
+ throw new UnsupportedOperationException(
+ "V012FileTimeIndex should be rewritten while upgrading and
getPossibleStartTimeAndEndTime() method should not be called any more.");
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
index e235871255..f316960b2d 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
@@ -451,7 +451,7 @@ public class MTreeBelowSGMemoryImpl implements
IMTreeBelowSG {
&& node.getChildren().isEmpty();
}
- public List<PartialPath> getPreDeleteTimeseries(PartialPath pathPattern)
+ public List<PartialPath> getPreDeletedTimeseries(PartialPath pathPattern)
throws MetadataException {
List<PartialPath> result = new LinkedList<>();
MeasurementCollector<List<PartialPath>> collector =
@@ -469,6 +469,27 @@ public class MTreeBelowSGMemoryImpl implements
IMTreeBelowSG {
return result;
}
+ /**
+ * Get all the devices of pre-deleted timeseries matched by given
pathPattern. For example, given
+ * path pattern root.sg.*.s1 and pre-deleted timeseries root.sg.d1.s1,
root.sg.d2.s1, then the
+ * result set is {root.sg.d1, root.sg.d2}.
+ */
+ public Set<PartialPath> getDevicesOfPreDeletedTimeseries(PartialPath
pathPattern)
+ throws MetadataException {
+ Set<PartialPath> result = new HashSet<>();
+ MeasurementCollector<List<PartialPath>> collector =
+ new MeasurementCollector<List<PartialPath>>(storageGroupMNode,
pathPattern, store) {
+ @Override
+ protected void collectMeasurement(IMeasurementMNode node) throws
MetadataException {
+ if (node.isPreDeleted()) {
+ result.add(getCurrentPartialPath(node).getDevicePath());
+ }
+ }
+ };
+ collector.traverse();
+ return result;
+ }
+
@Override
public void setAlias(IMeasurementMNode measurementMNode, String alias)
throws MetadataException {
store.setAlias(measurementMNode, alias);
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
index c70e2befd4..3a146ad3e3 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
@@ -138,7 +138,7 @@ public interface ISchemaRegion {
*/
void rollbackSchemaBlackList(PathPatternTree patternTree) throws
MetadataException;
- List<PartialPath> fetchSchemaBlackList(PathPatternTree patternTree) throws
MetadataException;
+ Set<PartialPath> fetchSchemaBlackList(PathPatternTree patternTree) throws
MetadataException;
/**
* Delete timeseries in schema black list.
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
index d0a26368e8..8f17757cc0 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
@@ -884,19 +884,21 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
}
@Override
- public List<PartialPath> fetchSchemaBlackList(PathPatternTree patternTree)
+ public Set<PartialPath> fetchSchemaBlackList(PathPatternTree patternTree)
throws MetadataException {
- List<PartialPath> pathList = new ArrayList<>();
+ Set<PartialPath> deviceBasedPathPatternSet = new HashSet<>();
for (PartialPath pathPattern : patternTree.getAllPathPatterns()) {
- pathList.addAll(mtree.getPreDeleteTimeseries(pathPattern));
+ for (PartialPath devicePath :
mtree.getDevicesOfPreDeletedTimeseries(pathPattern)) {
+
deviceBasedPathPatternSet.addAll(pathPattern.alterPrefixPath(devicePath));
+ }
}
- return pathList;
+ return deviceBasedPathPatternSet;
}
@Override
public void deleteTimeseriesInBlackList(PathPatternTree patternTree) throws
MetadataException {
for (PartialPath pathPattern : patternTree.getAllPathPatterns()) {
- for (PartialPath path : mtree.getPreDeleteTimeseries(pathPattern)) {
+ for (PartialPath path : mtree.getPreDeletedTimeseries(pathPattern)) {
try {
deleteSingleTimeseriesInBlackList(path);
writeToMLog(new
DeleteTimeSeriesPlan(Collections.singletonList(path)));
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
index e54f91fc33..5ee3512d29 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
@@ -766,7 +766,7 @@ public class SchemaRegionSchemaFileImpl implements
ISchemaRegion {
}
@Override
- public List<PartialPath> fetchSchemaBlackList(PathPatternTree patternTree)
+ public Set<PartialPath> fetchSchemaBlackList(PathPatternTree patternTree)
throws MetadataException {
throw new UnsupportedOperationException();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 9670882ea4..e99eab0125 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -410,11 +410,8 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
DataNodeSchemaCache cache = DataNodeSchemaCache.getInstance();
cache.takeWriteLock();
try {
- for (PartialPath pathPattern :
-
PathPatternTree.deserialize(ByteBuffer.wrap(req.getPathPatternTree()))
- .getAllPathPatterns()) {
- cache.invalidateMatchedSchema(pathPattern);
- }
+ // todo implement precise timeseries clean rather than clean all
+ cache.cleanUp();
} finally {
cache.releaseWriteLock();
}