This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 10611563ae [IOTDB-4448] Implement pattern based data deletion in 
DataRegion to speed up deleting timeseries (#7364)
10611563ae is described below

commit 10611563ae337fc52c024e99d99e8b943082083f
Author: Marcos_Zyk <[email protected]>
AuthorDate: Thu Sep 29 13:13:42 2022 +0800

    [IOTDB-4448] Implement pattern based data deletion in DataRegion to speed 
up deleting timeseries (#7364)
    
    [IOTDB-4448] Implement pattern based data deletion in DataRegion to speed 
up deleting timeseries (#7364)
---
 .../procedure/impl/DeleteTimeSeriesProcedure.java  | 144 ++++++++++++---------
 .../procedure/state/DeleteTimeSeriesState.java     |   2 +-
 .../org/apache/iotdb/commons/path/PartialPath.java |  18 +++
 .../schemaregion/rocksdb/RSchemaRegion.java        |   2 +-
 .../iotdb/db/engine/memtable/AbstractMemTable.java |  79 ++++++-----
 .../memtable/AlignedWritableMemChunkGroup.java     |  20 ++-
 .../db/engine/memtable/WritableMemChunkGroup.java  |  28 ++--
 .../iotdb/db/engine/storagegroup/DataRegion.java   |  29 +++--
 .../db/engine/storagegroup/TsFileResource.java     |   9 ++
 .../storagegroup/timeindex/DeviceTimeIndex.java    |  27 ++++
 .../storagegroup/timeindex/FileTimeIndex.java      |   7 +
 .../engine/storagegroup/timeindex/ITimeIndex.java  |   4 +
 .../storagegroup/timeindex/V012FileTimeIndex.java  |   8 ++
 .../db/metadata/mtree/MTreeBelowSGMemoryImpl.java  |  23 +++-
 .../db/metadata/schemaregion/ISchemaRegion.java    |   2 +-
 .../schemaregion/SchemaRegionMemoryImpl.java       |  12 +-
 .../schemaregion/SchemaRegionSchemaFileImpl.java   |   2 +-
 .../impl/DataNodeInternalRPCServiceImpl.java       |   7 +-
 18 files changed, 295 insertions(+), 128 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/DeleteTimeSeriesProcedure.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/DeleteTimeSeriesProcedure.java
index c9be7cd1a9..57f2b776c5 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/DeleteTimeSeriesProcedure.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/DeleteTimeSeriesProcedure.java
@@ -94,38 +94,46 @@ public class DeleteTimeSeriesProcedure
   @Override
   protected Flow executeFromState(ConfigNodeProcedureEnv env, 
DeleteTimeSeriesState state)
       throws ProcedureSuspendedException, ProcedureYieldException, 
InterruptedException {
-    switch (state) {
-      case CONSTRUCT_BLACK_LIST:
-        LOGGER.info("Construct schema black list of timeseries {}", 
requestMessage);
-        if (constructBlackList(env) > 0) {
-          setNextState(DeleteTimeSeriesState.CLEAN_DATANODE_SCHEMA_CACHE);
+    long startTime = System.currentTimeMillis();
+    try {
+      switch (state) {
+        case CONSTRUCT_BLACK_LIST:
+          LOGGER.info("Construct schema black list of timeseries {}", 
requestMessage);
+          if (constructBlackList(env) > 0) {
+            setNextState(DeleteTimeSeriesState.CLEAN_DATANODE_SCHEMA_CACHE);
+            break;
+          } else {
+            setFailure(
+                new ProcedureException(
+                    new PathNotExistException(
+                        patternTree.getAllPathPatterns().stream()
+                            .map(PartialPath::getFullPath)
+                            .collect(Collectors.toList()))));
+            return Flow.NO_MORE_STATE;
+          }
+        case CLEAN_DATANODE_SCHEMA_CACHE:
+          LOGGER.info("Invalidate cache of timeseries {}", requestMessage);
+          invalidateCache(env);
+          break;
+        case DELETE_DATA:
+          LOGGER.info("Delete data of timeseries {}", requestMessage);
+          deleteData(env);
           break;
-        } else {
-          setFailure(
-              new ProcedureException(
-                  new PathNotExistException(
-                      patternTree.getAllPathPatterns().stream()
-                          .map(PartialPath::getFullPath)
-                          .collect(Collectors.toList()))));
+        case DELETE_TIMESERIES_SCHEMA:
+          LOGGER.info("Delete timeseries schema of {}", requestMessage);
+          deleteTimeSeriesSchema(env);
           return Flow.NO_MORE_STATE;
-        }
-      case CLEAN_DATANODE_SCHEMA_CACHE:
-        LOGGER.info("Invalidate cache of timeseries {}", requestMessage);
-        invalidateCache(env);
-        break;
-      case DELETE_DATA:
-        LOGGER.info("Delete data of timeseries {}", requestMessage);
-        deleteData(env);
-        break;
-      case DELETE_TIMESERIES:
-        LOGGER.info("Delete timeseries schema of {}", requestMessage);
-        deleteTimeSeries(env);
-        return Flow.NO_MORE_STATE;
-      default:
-        setFailure(new ProcedureException("Unrecognized state " + 
state.toString()));
-        return Flow.NO_MORE_STATE;
+        default:
+          setFailure(new ProcedureException("Unrecognized state " + 
state.toString()));
+          return Flow.NO_MORE_STATE;
+      }
+      return Flow.HAS_MORE_STATE;
+    } finally {
+      LOGGER.info(
+          String.format(
+              "DeleteTimeSeries-[%s] costs %sms",
+              state.toString(), (System.currentTimeMillis() - startTime)));
     }
-    return Flow.HAS_MORE_STATE;
   }
 
   // return the total num of timeseries in schema black list
@@ -195,6 +203,19 @@ public class DeleteTimeSeriesProcedure
   }
 
   private void deleteData(ConfigNodeProcedureEnv env) {
+    deleteDataWithRawPathPattern(env);
+  }
+
+  private void deleteDataWithRawPathPattern(ConfigNodeProcedureEnv env) {
+    executeDeleteData(env, patternTree);
+    if (isFailed()) {
+      return;
+    }
+    setNextState(DeleteTimeSeriesState.DELETE_TIMESERIES_SCHEMA);
+  }
+
+  // todo this will be used in IDTable scenarios
+  private void deleteDataWithResolvedPath(ConfigNodeProcedureEnv env) {
     Map<TConsensusGroupId, TRegionReplicaSet> relatedSchemaRegionGroup =
         env.getConfigManager().getRelatedSchemaRegionGroup(patternTree);
     Map<TDataNodeLocation, List<TConsensusGroupId>> 
dataNodeSchemaRegionGroupGroupIdMap =
@@ -233,39 +254,44 @@ public class DeleteTimeSeriesProcedure
         continue;
       }
 
-      Map<TConsensusGroupId, TRegionReplicaSet> relatedDataRegionGroup =
-          env.getConfigManager().getRelatedDataRegionGroup(patternTree);
-
-      // target timeseries has no data
-      if (relatedDataRegionGroup.isEmpty()) {
-        continue;
-      }
+      executeDeleteData(env, patternTree);
 
-      RegionTask<TSStatus> deleteDataTask =
-          new RegionTask<TSStatus>("delete data", env, relatedDataRegionGroup) 
{
-            @Override
-            Map<Integer, TSStatus> sendRequest(
-                TDataNodeLocation dataNodeLocation, List<TConsensusGroupId> 
consensusGroupIdList) {
-              Map<Integer, TDataNodeLocation> dataNodeLocationMap = new 
HashMap<>();
-              dataNodeLocationMap.put(dataNodeLocation.getDataNodeId(), 
dataNodeLocation);
-              DeleteDataForDeleteTimeSeriesDataNodeTask dataNodeTask =
-                  new 
DeleteDataForDeleteTimeSeriesDataNodeTask(dataNodeLocationMap);
-              AsyncDataNodeClientPool.getInstance()
-                  .sendAsyncRequestToDataNodeWithRetry(
-                      new TDeleteDataForDeleteTimeSeriesReq(
-                          new ArrayList<>(consensusGroupIdList),
-                          preparePatternTreeBytesData(patternTree)),
-                      dataNodeTask);
-              return dataNodeTask.getDataNodeResponseMap();
-            }
-          };
-      deleteDataTask.setExecuteOnAllReplicaset(true);
-      deleteDataTask.execute();
       if (isFailed()) {
         return;
       }
     }
-    setNextState(DeleteTimeSeriesState.DELETE_TIMESERIES);
+    setNextState(DeleteTimeSeriesState.DELETE_TIMESERIES_SCHEMA);
+  }
+
+  private void executeDeleteData(ConfigNodeProcedureEnv env, PathPatternTree 
patternTree) {
+    Map<TConsensusGroupId, TRegionReplicaSet> relatedDataRegionGroup =
+        env.getConfigManager().getRelatedDataRegionGroup(patternTree);
+
+    // target timeseries has no data
+    if (relatedDataRegionGroup.isEmpty()) {
+      return;
+    }
+
+    RegionTask<TSStatus> deleteDataTask =
+        new RegionTask<TSStatus>("delete data", env, relatedDataRegionGroup) {
+          @Override
+          Map<Integer, TSStatus> sendRequest(
+              TDataNodeLocation dataNodeLocation, List<TConsensusGroupId> 
consensusGroupIdList) {
+            Map<Integer, TDataNodeLocation> dataNodeLocationMap = new 
HashMap<>();
+            dataNodeLocationMap.put(dataNodeLocation.getDataNodeId(), 
dataNodeLocation);
+            DeleteDataForDeleteTimeSeriesDataNodeTask dataNodeTask =
+                new 
DeleteDataForDeleteTimeSeriesDataNodeTask(dataNodeLocationMap);
+            AsyncDataNodeClientPool.getInstance()
+                .sendAsyncRequestToDataNodeWithRetry(
+                    new TDeleteDataForDeleteTimeSeriesReq(
+                        new ArrayList<>(consensusGroupIdList),
+                        preparePatternTreeBytesData(patternTree)),
+                    dataNodeTask);
+            return dataNodeTask.getDataNodeResponseMap();
+          }
+        };
+    deleteDataTask.setExecuteOnAllReplicaset(true);
+    deleteDataTask.execute();
   }
 
   private PathPatternTree fetchSchemaBlackListOnTargetDataNode(
@@ -315,10 +341,10 @@ public class DeleteTimeSeriesProcedure
     return patternTree;
   }
 
-  private void deleteTimeSeries(ConfigNodeProcedureEnv env) {
+  private void deleteTimeSeriesSchema(ConfigNodeProcedureEnv env) {
     RegionTask<TSStatus> deleteTimeSeriesTask =
         new RegionTask<TSStatus>(
-            "delete timeseries",
+            "delete timeseries schema",
             env,
             env.getConfigManager().getRelatedSchemaRegionGroup(patternTree)) {
           @Override
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/DeleteTimeSeriesState.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/DeleteTimeSeriesState.java
index 1cf6e7e9e5..5f998f2975 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/DeleteTimeSeriesState.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/DeleteTimeSeriesState.java
@@ -23,5 +23,5 @@ public enum DeleteTimeSeriesState {
   CONSTRUCT_BLACK_LIST,
   CLEAN_DATANODE_SCHEMA_CACHE,
   DELETE_DATA,
-  DELETE_TIMESERIES
+  DELETE_TIMESERIES_SCHEMA
 }
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java 
b/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
index 3e21d0b38d..3122741e43 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
@@ -103,6 +103,15 @@ public class PartialPath extends Path implements 
Comparable<Path>, Cloneable {
     }
   }
 
+  public boolean hasWildcard() {
+    for (String node : nodes) {
+      if (ONE_LEVEL_PATH_WILDCARD.equals(node) || 
MULTI_LEVEL_PATH_WILDCARD.equals(node)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   /**
    * it will return a new partial path
    *
@@ -617,6 +626,15 @@ public class PartialPath extends Path implements 
Comparable<Path>, Cloneable {
     return new PartialPath(Arrays.copyOf(nodes, nodes.length - 1));
   }
 
+  public List<PartialPath> getDevicePathPattern() {
+    List<PartialPath> result = new ArrayList<>();
+    result.add(getDevicePath());
+    if (nodes[nodes.length - 1].equals(MULTI_LEVEL_PATH_WILDCARD)) {
+      result.add(new PartialPath(nodes));
+    }
+    return result;
+  }
+
   @TestOnly
   public Path toTSFilePath() {
     return new Path(getDevice(), getMeasurement());
diff --git 
a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
 
b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
index 3575e027c9..1a5ba6715e 100644
--- 
a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
+++ 
b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
@@ -693,7 +693,7 @@ public class RSchemaRegion implements ISchemaRegion {
   }
 
   @Override
-  public List<PartialPath> fetchSchemaBlackList(PathPatternTree patternTree)
+  public Set<PartialPath> fetchSchemaBlackList(PathPatternTree patternTree)
       throws MetadataException {
     throw new UnsupportedOperationException();
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 758786b544..05938e4d74 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.engine.memtable;
 
+import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.flush.FlushStatus;
@@ -165,7 +166,6 @@ public abstract class AbstractMemTable implements IMemTable 
{
       
insertRowPlan.setDeviceID(deviceIDFactory.getDeviceID(insertRowPlan.getDevicePath()));
     }
 
-    updatePlanIndexes(insertRowPlan.getIndex());
     String[] measurements = insertRowPlan.getMeasurements();
     Object[] values = insertRowPlan.getValues();
 
@@ -212,12 +212,9 @@ public abstract class AbstractMemTable implements 
IMemTable {
     // if this insert plan isn't from storage engine (mainly from test), we 
should set a temp device
     // id for it
     if (insertRowNode.getDeviceID() == null) {
-      insertRowNode.setDeviceID(
-          
DeviceIDFactory.getInstance().getDeviceID(insertRowNode.getDevicePath()));
+      
insertRowNode.setDeviceID(deviceIDFactory.getDeviceID(insertRowNode.getDevicePath()));
     }
 
-    // updatePlanIndexes(insertRowNode.getIndex());
-    updatePlanIndexes(0);
     String[] measurements = insertRowNode.getMeasurements();
     Object[] values = insertRowNode.getValues();
 
@@ -266,7 +263,6 @@ public abstract class AbstractMemTable implements IMemTable 
{
       
insertRowPlan.setDeviceID(deviceIDFactory.getDeviceID(insertRowPlan.getDevicePath()));
     }
 
-    updatePlanIndexes(insertRowPlan.getIndex());
     String[] measurements = insertRowPlan.getMeasurements();
     Object[] values = insertRowPlan.getValues();
 
@@ -307,8 +303,6 @@ public abstract class AbstractMemTable implements IMemTable 
{
       
insertRowNode.setDeviceID(deviceIDFactory.getDeviceID(insertRowNode.getDevicePath()));
     }
 
-    // TODO updatePlanIndexes(insertRowNode.getIndex());
-    updatePlanIndexes(0);
     String[] measurements = insertRowNode.getMeasurements();
     Object[] values = insertRowNode.getValues();
     List<IMeasurementSchema> schemaList = new ArrayList<>();
@@ -344,7 +338,6 @@ public abstract class AbstractMemTable implements IMemTable 
{
   @Override
   public void insertTablet(InsertTabletPlan insertTabletPlan, int start, int 
end)
       throws WriteProcessException {
-    updatePlanIndexes(insertTabletPlan.getIndex());
     try {
       write(insertTabletPlan, start, end);
       memSize += MemUtils.getTabletSize(insertTabletPlan, start, end, 
disableMemControl);
@@ -367,7 +360,6 @@ public abstract class AbstractMemTable implements IMemTable 
{
   @Override
   public void insertAlignedTablet(InsertTabletPlan insertTabletPlan, int 
start, int end)
       throws WriteProcessException {
-    updatePlanIndexes(insertTabletPlan.getIndex());
     try {
       writeAlignedTablet(insertTabletPlan, start, end);
       memSize += MemUtils.getAlignedTabletSize(insertTabletPlan, start, end, 
disableMemControl);
@@ -390,9 +382,6 @@ public abstract class AbstractMemTable implements IMemTable 
{
   @Override
   public void insertTablet(InsertTabletNode insertTabletNode, int start, int 
end)
       throws WriteProcessException {
-    // TODO: PlanIndex
-    // updatePlanIndexes(insertTabletPlan.getIndex());
-    updatePlanIndexes(0);
     try {
       write(insertTabletNode, start, end);
       memSize += MemUtils.getTabletSize(insertTabletNode, start, end, 
disableMemControl);
@@ -415,9 +404,6 @@ public abstract class AbstractMemTable implements IMemTable 
{
   @Override
   public void insertAlignedTablet(InsertTabletNode insertTabletNode, int 
start, int end)
       throws WriteProcessException {
-    // TODO: PlanIndex
-    // updatePlanIndexes(insertTabletPlan.getIndex());
-    updatePlanIndexes(0);
     try {
       writeAlignedTablet(insertTabletNode, start, end);
       memSize += MemUtils.getAlignedTabletSize(insertTabletNode, start, end, 
disableMemControl);
@@ -495,8 +481,7 @@ public abstract class AbstractMemTable implements IMemTable 
{
   public void write(InsertTabletNode insertTabletNode, int start, int end) {
     // if this insert plan isn't from storage engine, we should set a temp 
device id for it
     if (insertTabletNode.getDeviceID() == null) {
-      insertTabletNode.setDeviceID(
-          
DeviceIDFactory.getInstance().getDeviceID(insertTabletNode.getDevicePath()));
+      
insertTabletNode.setDeviceID(deviceIDFactory.getDeviceID(insertTabletNode.getDevicePath()));
     }
 
     List<IMeasurementSchema> schemaList = new ArrayList<>();
@@ -554,8 +539,7 @@ public abstract class AbstractMemTable implements IMemTable 
{
   public void writeAlignedTablet(InsertTabletNode insertTabletNode, int start, 
int end) {
     // if this insert plan isn't from storage engine, we should set a temp 
device id for it
     if (insertTabletNode.getDeviceID() == null) {
-      insertTabletNode.setDeviceID(
-          
DeviceIDFactory.getInstance().getDeviceID(insertTabletNode.getDevicePath()));
+      
insertTabletNode.setDeviceID(deviceIDFactory.getDeviceID(insertTabletNode.getDevicePath()));
     }
 
     List<IMeasurementSchema> schemaList = new ArrayList<>();
@@ -654,17 +638,55 @@ public abstract class AbstractMemTable implements 
IMemTable {
         .getReadOnlyMemChunkFromMemTable(this, modsToMemtable, ttlLowerBound);
   }
 
+  /**
+   * @param originalPath the original path pattern or full path to be used to 
match timeseries, e.g.
+   *     root.sg.**, root.sg.*.s, root.sg.d.s
+   * @param devicePath one of the device path patterns generated by original 
path, e.g. given
+   *     original path root.sg.** and the device path may be root.sg or 
root.sg.**
+   * @param startTimestamp the lower-bound of deletion time.
+   * @param endTimestamp the upper-bound of deletion time
+   */
   @SuppressWarnings("squid:S3776") // high Cognitive Complexity
   @Override
   public void delete(
       PartialPath originalPath, PartialPath devicePath, long startTimestamp, 
long endTimestamp) {
-    IWritableMemChunkGroup memChunkGroup = 
memTableMap.get(getDeviceID(devicePath));
-    if (memChunkGroup == null) {
-      return;
+    if (devicePath.hasWildcard()) {
+      // In cluster mode without IDTable, the input devicePath may be a 
devicePathPattern
+      List<Pair<PartialPath, IWritableMemChunkGroup>> targetDeviceList = new 
ArrayList<>();
+      for (Entry<IDeviceID, IWritableMemChunkGroup> entry : 
memTableMap.entrySet()) {
+        try {
+          PartialPath devicePathInMemTable = new 
PartialPath(entry.getKey().toStringID());
+          if (devicePath.matchFullPath(devicePathInMemTable)) {
+            targetDeviceList.add(new Pair<>(devicePathInMemTable, 
entry.getValue()));
+          }
+        } catch (IllegalPathException e) {
+          // won't reach here
+        }
+      }
+
+      for (Pair<PartialPath, IWritableMemChunkGroup> targetDevice : 
targetDeviceList) {
+        deleteDataInChunkGroup(
+            targetDevice.right, originalPath, targetDevice.left, 
startTimestamp, endTimestamp);
+      }
+    } else {
+      IWritableMemChunkGroup memChunkGroup =
+          memTableMap.get(deviceIDFactory.getDeviceID(devicePath));
+      if (memChunkGroup == null) {
+        return;
+      }
+      deleteDataInChunkGroup(memChunkGroup, originalPath, devicePath, 
startTimestamp, endTimestamp);
     }
+  }
+
+  private void deleteDataInChunkGroup(
+      IWritableMemChunkGroup memChunkGroup,
+      PartialPath originalPath,
+      PartialPath devicePath,
+      long startTimestamp,
+      long endTimestamp) {
     totalPointsNum -= memChunkGroup.delete(originalPath, devicePath, 
startTimestamp, endTimestamp);
     if (memChunkGroup.getMemChunkMap().isEmpty()) {
-      memTableMap.remove(getDeviceID(devicePath));
+      memTableMap.remove(deviceIDFactory.getDeviceID(devicePath));
     }
   }
 
@@ -720,11 +742,6 @@ public abstract class AbstractMemTable implements 
IMemTable {
     return minPlanIndex;
   }
 
-  void updatePlanIndexes(long index) {
-    maxPlanIndex = Math.max(index, maxPlanIndex);
-    minPlanIndex = Math.min(index, minPlanIndex);
-  }
-
   @Override
   public long getMemTableId() {
     return memTableId;
@@ -745,10 +762,6 @@ public abstract class AbstractMemTable implements 
IMemTable {
     this.flushStatus = flushStatus;
   }
 
-  private IDeviceID getDeviceID(PartialPath deviceId) {
-    return deviceIDFactory.getDeviceID(deviceId);
-  }
-
   /** Notice: this method is concurrent unsafe */
   @Override
   public int serializedSize() {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunkGroup.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunkGroup.java
index 6a3084835c..25856adf27 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunkGroup.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunkGroup.java
@@ -34,6 +34,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static 
org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+import static 
org.apache.iotdb.commons.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
+
 public class AlignedWritableMemChunkGroup implements IWritableMemChunkGroup {
 
   private AlignedWritableMemChunk memChunk;
@@ -99,9 +102,10 @@ public class AlignedWritableMemChunkGroup implements 
IWritableMemChunkGroup {
     int deletedPointsNumber = 0;
     Set<String> measurements = memChunk.getAllMeasurements();
     List<String> columnsToBeRemoved = new ArrayList<>();
-    for (String measurement : measurements) {
-      PartialPath fullPath = devicePath.concatNode(measurement);
-      if (originalPath.matchFullPath(fullPath)) {
+    String targetMeasurement = originalPath.getMeasurement();
+    if (targetMeasurement.equals(ONE_LEVEL_PATH_WILDCARD)
+        || targetMeasurement.equals(MULTI_LEVEL_PATH_WILDCARD)) {
+      for (String measurement : measurements) {
         Pair<Integer, Boolean> deleteInfo =
             memChunk.deleteDataFromAColumn(startTimestamp, endTimestamp, 
measurement);
         deletedPointsNumber += deleteInfo.left;
@@ -109,7 +113,17 @@ public class AlignedWritableMemChunkGroup implements 
IWritableMemChunkGroup {
           columnsToBeRemoved.add(measurement);
         }
       }
+    } else {
+      if (measurements.contains(targetMeasurement)) {
+        Pair<Integer, Boolean> deleteInfo =
+            memChunk.deleteDataFromAColumn(startTimestamp, endTimestamp, 
targetMeasurement);
+        deletedPointsNumber += deleteInfo.left;
+        if (Boolean.TRUE.equals(deleteInfo.right)) {
+          columnsToBeRemoved.add(targetMeasurement);
+        }
+      }
     }
+
     for (String columnToBeRemoved : columnsToBeRemoved) {
       memChunk.removeColumn(columnToBeRemoved);
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkGroup.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkGroup.java
index 75e24d4fb6..128a3d6403 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkGroup.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkGroup.java
@@ -34,6 +34,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import static 
org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+import static 
org.apache.iotdb.commons.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
+
 public class WritableMemChunkGroup implements IWritableMemChunkGroup {
 
   private Map<String, IWritableMemChunk> memChunkMap;
@@ -117,21 +120,28 @@ public class WritableMemChunkGroup implements 
IWritableMemChunkGroup {
   public int delete(
       PartialPath originalPath, PartialPath devicePath, long startTimestamp, 
long endTimestamp) {
     int deletedPointsNumber = 0;
-    Iterator<Entry<String, IWritableMemChunk>> iter = 
memChunkMap.entrySet().iterator();
-    while (iter.hasNext()) {
-      Entry<String, IWritableMemChunk> entry = iter.next();
-      IWritableMemChunk chunk = entry.getValue();
-      // the key is measurement rather than component of multiMeasurement
-      PartialPath fullPath = devicePath.concatNode(entry.getKey());
-      if (originalPath.matchFullPath(fullPath)) {
-        // matchFullPath ensures this branch could work on delete data of 
unary or multi measurement
-        // and delete timeseries
+    String targetMeasurement = originalPath.getMeasurement();
+    if (targetMeasurement.equals(ONE_LEVEL_PATH_WILDCARD)
+        || targetMeasurement.equals(MULTI_LEVEL_PATH_WILDCARD)) {
+      Iterator<Entry<String, IWritableMemChunk>> iter = 
memChunkMap.entrySet().iterator();
+      while (iter.hasNext()) {
+        Entry<String, IWritableMemChunk> entry = iter.next();
+        IWritableMemChunk chunk = entry.getValue();
         if (startTimestamp == Long.MIN_VALUE && endTimestamp == 
Long.MAX_VALUE) {
           iter.remove();
         }
         deletedPointsNumber += chunk.delete(startTimestamp, endTimestamp);
       }
+    } else {
+      IWritableMemChunk chunk = memChunkMap.get(targetMeasurement);
+      if (chunk != null) {
+        if (startTimestamp == Long.MIN_VALUE && endTimestamp == 
Long.MAX_VALUE) {
+          memChunkMap.remove(targetMeasurement);
+        }
+        deletedPointsNumber += chunk.delete(startTimestamp, endTimestamp);
+      }
     }
+
     return deletedPointsNumber;
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index 37395e1207..1543c18ec9 100755
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -122,6 +122,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -2212,8 +2213,7 @@ public class DataRegion {
 
     try {
 
-      PartialPath devicePath = pattern.getDevicePath();
-      Set<PartialPath> devicePaths = Collections.singleton(devicePath);
+      Set<PartialPath> devicePaths = new 
HashSet<>(pattern.getDevicePathPattern());
 
       // delete Last cache record if necessary
       // todo implement more precise process
@@ -2337,21 +2337,32 @@ public class DataRegion {
     }
 
     for (PartialPath device : devicePaths) {
-      String deviceId = device.getFullPath();
-      if (!tsFileResource.mayContainsDevice(deviceId)) {
-        // resource does not contain this device
-        continue;
+      long deviceStartTime, deviceEndTime;
+      if (device.hasWildcard()) {
+        Pair<Long, Long> startAndEndTime = 
tsFileResource.getPossibleStartTimeAndEndTime(device);
+        if (startAndEndTime == null) {
+          continue;
+        }
+        deviceStartTime = startAndEndTime.getLeft();
+        deviceEndTime = startAndEndTime.getRight();
+      } else {
+        String deviceId = device.getFullPath();
+        if (!tsFileResource.mayContainsDevice(deviceId)) {
+          // resource does not contain this device
+          continue;
+        }
+        deviceStartTime = tsFileResource.getStartTime(deviceId);
+        deviceEndTime = tsFileResource.getEndTime(deviceId);
       }
 
-      long deviceEndTime = tsFileResource.getEndTime(deviceId);
       if (!tsFileResource.isClosed() && deviceEndTime == Long.MIN_VALUE) {
         // unsealed seq file
-        if (deleteEnd >= tsFileResource.getStartTime(deviceId)) {
+        if (deleteEnd >= deviceStartTime) {
           return false;
         }
       } else {
         // sealed file or unsealed unseq file
-        if (deleteEnd >= tsFileResource.getStartTime(deviceId) && deleteStart 
<= deviceEndTime) {
+        if (deleteEnd >= deviceStartTime && deleteStart <= deviceEndTime) {
           // time range of device has overlap with the deletion
           return false;
         }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 050f50cf92..e67203d242 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -43,6 +43,7 @@ import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.utils.FilePathUtils;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
@@ -428,6 +429,14 @@ public class TsFileResource {
     return timeIndex.mayContainsDevice(device);
   }
 
+  /**
+   * Get the min start time and max end time of devices matched by given 
devicePattern. If there's
+   * no device matched by given pattern, return null.
+   */
+  public Pair<Long, Long> getPossibleStartTimeAndEndTime(PartialPath 
devicePattern) {
+    return timeIndex.getPossibleStartTimeAndEndTime(devicePattern);
+  }
+
   public boolean isClosed() {
     return this.status != TsFileResourceStatus.UNCLOSED;
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
index 5d75391cea..9e7daf659c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
@@ -19,11 +19,14 @@
 
 package org.apache.iotdb.db.engine.storagegroup.timeindex;
 
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.SerializeUtils;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.PartitionViolationException;
 import org.apache.iotdb.tsfile.utils.FilePathUtils;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.RamUsageEstimator;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
@@ -337,4 +340,28 @@ public class DeviceTimeIndex implements ITimeIndex {
   public boolean mayContainsDevice(String device) {
     return deviceToIndex.containsKey(device);
   }
+
+  @Override
+  public Pair<Long, Long> getPossibleStartTimeAndEndTime(PartialPath 
devicePattern) {
+    boolean hasMatchedDevice = false;
+    long startTime = Long.MAX_VALUE;
+    long endTime = Long.MIN_VALUE;
+    for (Entry<String, Integer> entry : deviceToIndex.entrySet()) {
+      try {
+        if (devicePattern.matchFullPath(new PartialPath(entry.getKey()))) {
+          hasMatchedDevice = true;
+          if (startTimes[entry.getValue()] < startTime) {
+            startTime = startTimes[entry.getValue()];
+          }
+          if (endTimes[entry.getValue()] > endTime) {
+            endTime = endTimes[entry.getValue()];
+          }
+        }
+      } catch (IllegalPathException e) {
+        // won't reach here
+      }
+    }
+
+    return hasMatchedDevice ? new Pair<>(startTime, endTime) : null;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java
index a1e9827855..9129ce240d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java
@@ -19,12 +19,14 @@
 
 package org.apache.iotdb.db.engine.storagegroup.timeindex;
 
+import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.PartitionViolationException;
 import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.utils.FilePathUtils;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.RamUsageEstimator;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
@@ -224,4 +226,9 @@ public class FileTimeIndex implements ITimeIndex {
   public boolean mayContainsDevice(String device) {
     return true;
   }
+
+  @Override
+  public Pair<Long, Long> getPossibleStartTimeAndEndTime(PartialPath 
devicePattern) {
+    return new Pair<>(startTime, endTime);
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
index b4e127bd3b..e3dd93685b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
@@ -19,8 +19,10 @@
 
 package org.apache.iotdb.db.engine.storagegroup.timeindex;
 
+import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.PartitionViolationException;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -186,4 +188,6 @@ public interface ITimeIndex {
    * it may or may not contain this device
    */
   boolean mayContainsDevice(String device);
+
+  Pair<Long, Long> getPossibleStartTimeAndEndTime(PartialPath devicePattern);
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/V012FileTimeIndex.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/V012FileTimeIndex.java
index 03abe86fa6..9443f8c977 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/V012FileTimeIndex.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/V012FileTimeIndex.java
@@ -19,9 +19,11 @@
 
 package org.apache.iotdb.db.engine.storagegroup.timeindex;
 
+import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.SerializeUtils;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.PartitionViolationException;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.IOException;
@@ -173,4 +175,10 @@ public class V012FileTimeIndex implements ITimeIndex {
     throw new UnsupportedOperationException(
         "V012FileTimeIndex should be rewritten while upgrading and 
containsDevice() method should not be called any more.");
   }
+
+  @Override
+  public Pair<Long, Long> getPossibleStartTimeAndEndTime(PartialPath 
devicePattern) {
+    throw new UnsupportedOperationException(
+        "V012FileTimeIndex should be rewritten while upgrading and 
getPossibleStartTimeAndEndTime() method should not be called any more.");
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
index e235871255..f316960b2d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
@@ -451,7 +451,7 @@ public class MTreeBelowSGMemoryImpl implements 
IMTreeBelowSG {
         && node.getChildren().isEmpty();
   }
 
-  public List<PartialPath> getPreDeleteTimeseries(PartialPath pathPattern)
+  public List<PartialPath> getPreDeletedTimeseries(PartialPath pathPattern)
       throws MetadataException {
     List<PartialPath> result = new LinkedList<>();
     MeasurementCollector<List<PartialPath>> collector =
@@ -469,6 +469,27 @@ public class MTreeBelowSGMemoryImpl implements 
IMTreeBelowSG {
     return result;
   }
 
+  /**
+   * Get all the devices of pre-deleted timeseries matched by given 
pathPattern. For example, given
+   * path pattern root.sg.*.s1 and pre-deleted timeseries root.sg.d1.s1, 
root.sg.d2.s1, then the
+   * result set is {root.sg.d1, root.sg.d2}.
+   */
+  public Set<PartialPath> getDevicesOfPreDeletedTimeseries(PartialPath 
pathPattern)
+      throws MetadataException {
+    Set<PartialPath> result = new HashSet<>();
+    MeasurementCollector<List<PartialPath>> collector =
+        new MeasurementCollector<List<PartialPath>>(storageGroupMNode, 
pathPattern, store) {
+          @Override
+          protected void collectMeasurement(IMeasurementMNode node) throws 
MetadataException {
+            if (node.isPreDeleted()) {
+              result.add(getCurrentPartialPath(node).getDevicePath());
+            }
+          }
+        };
+    collector.traverse();
+    return result;
+  }
+
   @Override
   public void setAlias(IMeasurementMNode measurementMNode, String alias) 
throws MetadataException {
     store.setAlias(measurementMNode, alias);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
index c70e2befd4..3a146ad3e3 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
@@ -138,7 +138,7 @@ public interface ISchemaRegion {
    */
   void rollbackSchemaBlackList(PathPatternTree patternTree) throws 
MetadataException;
 
-  List<PartialPath> fetchSchemaBlackList(PathPatternTree patternTree) throws 
MetadataException;
+  Set<PartialPath> fetchSchemaBlackList(PathPatternTree patternTree) throws 
MetadataException;
 
   /**
    * Delete timeseries in schema black list.
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
index d0a26368e8..8f17757cc0 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
@@ -884,19 +884,21 @@ public class SchemaRegionMemoryImpl implements 
ISchemaRegion {
   }
 
   @Override
-  public List<PartialPath> fetchSchemaBlackList(PathPatternTree patternTree)
+  public Set<PartialPath> fetchSchemaBlackList(PathPatternTree patternTree)
       throws MetadataException {
-    List<PartialPath> pathList = new ArrayList<>();
+    Set<PartialPath> deviceBasedPathPatternSet = new HashSet<>();
     for (PartialPath pathPattern : patternTree.getAllPathPatterns()) {
-      pathList.addAll(mtree.getPreDeleteTimeseries(pathPattern));
+      for (PartialPath devicePath : 
mtree.getDevicesOfPreDeletedTimeseries(pathPattern)) {
+        
deviceBasedPathPatternSet.addAll(pathPattern.alterPrefixPath(devicePath));
+      }
     }
-    return pathList;
+    return deviceBasedPathPatternSet;
   }
 
   @Override
   public void deleteTimeseriesInBlackList(PathPatternTree patternTree) throws 
MetadataException {
     for (PartialPath pathPattern : patternTree.getAllPathPatterns()) {
-      for (PartialPath path : mtree.getPreDeleteTimeseries(pathPattern)) {
+      for (PartialPath path : mtree.getPreDeletedTimeseries(pathPattern)) {
         try {
           deleteSingleTimeseriesInBlackList(path);
           writeToMLog(new 
DeleteTimeSeriesPlan(Collections.singletonList(path)));
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
index e54f91fc33..5ee3512d29 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
@@ -766,7 +766,7 @@ public class SchemaRegionSchemaFileImpl implements 
ISchemaRegion {
   }
 
   @Override
-  public List<PartialPath> fetchSchemaBlackList(PathPatternTree patternTree)
+  public Set<PartialPath> fetchSchemaBlackList(PathPatternTree patternTree)
       throws MetadataException {
     throw new UnsupportedOperationException();
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 9670882ea4..e99eab0125 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -410,11 +410,8 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
     DataNodeSchemaCache cache = DataNodeSchemaCache.getInstance();
     cache.takeWriteLock();
     try {
-      for (PartialPath pathPattern :
-          
PathPatternTree.deserialize(ByteBuffer.wrap(req.getPathPatternTree()))
-              .getAllPathPatterns()) {
-        cache.invalidateMatchedSchema(pathPattern);
-      }
+      // todo implement precise timeseries clean rather than clean all
+      cache.cleanUp();
     } finally {
       cache.releaseWriteLock();
     }


Reply via email to