This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c085d13af2 Decrease TTL Deletion in compaction modification cache 
(#12687)
9c085d13af2 is described below

commit 9c085d13af2780488a8bef9132e8eab321173f45
Author: shuwenwei <[email protected]>
AuthorDate: Fri Jun 14 16:17:43 2024 +0800

    Decrease TTL Deletion in compaction modification cache (#12687)
    
    * decrease TTL Deletion in compaction modification cache
    
    * Update 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
    
    Co-authored-by: Jiang Tian <[email protected]>
    
    * modify MultiTsFileDeviceIterator
    
    * fix spotless
    
    ---------
    
    Co-authored-by: Jiang Tian <[email protected]>
---
 .../CompactionLastTimeCheckFailedException.java    |  17 +++
 .../impl/ReadChunkCompactionPerformer.java         |   7 +-
 .../execute/utils/CompactionPathUtils.java         |  11 +-
 .../execute/utils/MultiTsFileDeviceIterator.java   | 123 +++++++++------------
 .../readchunk/SingleSeriesCompactionExecutor.java  |  21 ++--
 5 files changed, 83 insertions(+), 96 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/CompactionLastTimeCheckFailedException.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/CompactionLastTimeCheckFailedException.java
index 6ffcfad8cab..26458513ba8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/CompactionLastTimeCheckFailedException.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/CompactionLastTimeCheckFailedException.java
@@ -19,6 +19,10 @@
 
 package 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception;
 
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+
+import org.apache.tsfile.file.metadata.IDeviceID;
+
 public class CompactionLastTimeCheckFailedException extends RuntimeException {
 
   public CompactionLastTimeCheckFailedException(
@@ -32,6 +36,19 @@ public class CompactionLastTimeCheckFailedException extends 
RuntimeException {
             + lastTimestamp);
   }
 
+  public CompactionLastTimeCheckFailedException(
+      IDeviceID device, String measurement, long currentTimestamp, long 
lastTimestamp) {
+    super(
+        "Timestamp of the current point of "
+            + device
+            + IoTDBConstant.PATH_SEPARATOR
+            + measurement
+            + " is "
+            + currentTimestamp
+            + ", which should be later than the last time "
+            + lastTimestamp);
+  }
+
   @Override
   public Throwable fillInStackTrace() {
     return this;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
index 67b98f175f8..51f6a7db434 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
@@ -21,13 +21,11 @@ package 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performe
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.MetadataException;
-import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionTargetFileCountExceededException;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ISeqCompactionPerformer;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CompactionTaskSummary;
-import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionPathUtils;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.MultiTsFileDeviceIterator;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.readchunk.ReadChunkAlignedSeriesCompactionExecutor;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.readchunk.SingleSeriesCompactionExecutor;
@@ -182,9 +180,8 @@ public class ReadChunkCompactionPerformer implements 
ISeqCompactionPerformer {
         
deviceIterator.iterateNotAlignedSeriesAndChunkMetadataListOfCurrentDevice();
     while (seriesIterator.hasNextSeries()) {
       checkThreadInterrupted();
-      String series = seriesIterator.nextSeries();
+      String measurement = seriesIterator.nextSeries();
       // TODO: we can provide a configuration item to enable concurrent 
between each series
-      PartialPath path = CompactionPathUtils.getPath(device, series);
       LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> 
readerAndChunkMetadataList =
           seriesIterator.getMetadataListForCurrentSeries();
       // remove the chunk metadata whose data type not match the data type of 
last chunk
@@ -192,7 +189,7 @@ public class ReadChunkCompactionPerformer implements 
ISeqCompactionPerformer {
           filterDataTypeNotMatchedChunkMetadata(readerAndChunkMetadataList);
       SingleSeriesCompactionExecutor compactionExecutorOfCurrentTimeSeries =
           new SingleSeriesCompactionExecutor(
-              path, readerAndChunkMetadataList, writer, targetResource, 
summary);
+              device, measurement, readerAndChunkMetadataList, writer, 
targetResource, summary);
       compactionExecutorOfCurrentTimeSeries.execute();
     }
     writer.endChunkGroup();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionPathUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionPathUtils.java
index 20de56d10fd..2cd4cf722c6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionPathUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionPathUtils.java
@@ -35,14 +35,7 @@ public class CompactionPathUtils {
 
   public static PartialPath getPath(IDeviceID device, String measurement)
       throws IllegalPathException {
-    PartialPath path;
-    String plainDeviceId = ((PlainDeviceID) device).toStringID();
-    if (plainDeviceId.contains(TsFileConstant.BACK_QUOTE_STRING)) {
-      path = 
DataNodeDevicePathCache.getInstance().getPartialPath(plainDeviceId);
-    } else {
-      path = new PartialPath(((PlainDeviceID) 
device).toStringID().split(PATH_SEPARATER_NO_REGEX));
-    }
-    return path.concatNode(measurement);
+    return getPath(device).concatNode(measurement);
   }
 
   public static PartialPath getPath(IDeviceID device) throws 
IllegalPathException {
@@ -51,7 +44,7 @@ public class CompactionPathUtils {
     if (plainDeviceId.contains(TsFileConstant.BACK_QUOTE_STRING)) {
       path = 
DataNodeDevicePathCache.getInstance().getPartialPath(plainDeviceId);
     } else {
-      path = new PartialPath(((PlainDeviceID) 
device).toStringID().split(PATH_SEPARATER_NO_REGEX));
+      path = new PartialPath(plainDeviceId.split(PATH_SEPARATER_NO_REGEX));
     }
     return path;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
index aa53aed93cc..4836119ec05 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
@@ -70,6 +70,7 @@ public class MultiTsFileDeviceIterator implements 
AutoCloseable {
   private final Map<TsFileResource, TsFileDeviceIterator> deviceIteratorMap = 
new HashMap<>();
   private final Map<TsFileResource, List<Modification>> modificationCache = 
new HashMap<>();
   private Pair<IDeviceID, Boolean> currentDevice = null;
+  private long timeLowerBoundForCurrentDevice;
 
   /**
    * Used for compaction with read chunk performer.
@@ -174,7 +175,7 @@ public class MultiTsFileDeviceIterator implements 
AutoCloseable {
    * @return Pair of device full path and whether this device is aligned
    */
   @SuppressWarnings("squid:S135")
-  public Pair<IDeviceID, Boolean> nextDevice() {
+  public Pair<IDeviceID, Boolean> nextDevice() throws IllegalPathException {
     List<TsFileResource> toBeRemovedResources = new LinkedList<>();
     Pair<IDeviceID, Boolean> minDevice = null;
     // get the device from source files sorted from the newest to the oldest 
by version
@@ -205,6 +206,11 @@ public class MultiTsFileDeviceIterator implements 
AutoCloseable {
     for (TsFileResource resource : toBeRemovedResources) {
       deviceIteratorMap.remove(resource);
     }
+
+    timeLowerBoundForCurrentDevice =
+        CommonDateTimeUtils.currentTime()
+            - DataNodeTTLCache.getInstance()
+                .getTTL(((PlainDeviceID) 
currentDevice.getLeft()).toStringID());
     return currentDevice;
   }
 
@@ -398,59 +404,45 @@ public class MultiTsFileDeviceIterator implements 
AutoCloseable {
       // all the value chunks is empty chunk
       return;
     }
+    IDeviceID device = currentDevice.getLeft();
+    Deletion ttlDeletion = null;
+    if (tsFileResource.getStartTime(device) < timeLowerBoundForCurrentDevice) {
+      ttlDeletion =
+          new Deletion(
+              CompactionPathUtils.getPath(device, 
IoTDBConstant.ONE_LEVEL_PATH_WILDCARD),
+              Long.MAX_VALUE,
+              Long.MIN_VALUE,
+              timeLowerBoundForCurrentDevice);
+    }
 
     List<Modification> modifications =
         modificationCache.computeIfAbsent(
             tsFileResource,
-            r -> {
-              List<Modification> list =
-                  new 
LinkedList<>(ModificationFile.getNormalMods(r).getModifications());
-              // add outdated device mods by ttl
-              try {
-                for (IDeviceID device : r.getDevices()) {
-                  // TODO: remove deviceId conversion
-                  long timeLowerBound =
-                      CommonDateTimeUtils.currentTime()
-                          - DataNodeTTLCache.getInstance()
-                              .getTTL(((PlainDeviceID) device).toStringID());
-                  if (r.getStartTime(device) < timeLowerBound) {
-                    list.add(
-                        new Deletion(
-                            CompactionPathUtils.getPath(device)
-                                
.concatNode(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD),
-                            Long.MAX_VALUE,
-                            Long.MIN_VALUE,
-                            timeLowerBound));
-                  }
-                }
-              } catch (IllegalPathException e) {
-                throw new RuntimeException(e);
-              }
-              return list;
-            });
+            r -> new 
LinkedList<>(ModificationFile.getNormalMods(r).getModifications()));
 
     // construct the input params List<List<Modification>> for 
QueryUtils.modifyAlignedChunkMetaData
     AlignedChunkMetadata alignedChunkMetadata = 
alignedChunkMetadataList.get(0);
     List<IChunkMetadata> valueChunkMetadataList = 
alignedChunkMetadata.getValueChunkMetadataList();
     List<List<Modification>> modificationForCurDevice = new ArrayList<>();
-    List<PartialPath> valueSeriesPaths = new 
ArrayList<>(valueChunkMetadataList.size());
-    for (int i = 0; i < valueChunkMetadataList.size(); ++i) {
-      modificationForCurDevice.add(new ArrayList<>());
-      IChunkMetadata valueChunkMetadata = valueChunkMetadataList.get(i);
-      valueSeriesPaths.add(
-          valueChunkMetadata == null
-              ? null
-              : CompactionPathUtils.getPath(
-                  currentDevice.left, valueChunkMetadata.getMeasurementUid()));
-    }
-
-    for (Modification modification : modifications) {
-      for (int i = 0; i < valueChunkMetadataList.size(); ++i) {
-        PartialPath path = valueSeriesPaths.get(i);
-        if (path != null && modification.getPath().matchFullPath(path)) {
-          modificationForCurDevice.get(i).add(modification);
+    for (IChunkMetadata valueChunkMetadata : valueChunkMetadataList) {
+      if (valueChunkMetadata == null) {
+        modificationForCurDevice.add(Collections.emptyList());
+        continue;
+      }
+      List<Modification> modificationList = new ArrayList<>();
+      PartialPath path =
+          CompactionPathUtils.getPath(
+              currentDevice.getLeft(), valueChunkMetadata.getMeasurementUid());
+      for (Modification modification : modifications) {
+        if (modification.getPath().matchFullPath(path)) {
+          modificationList.add(modification);
         }
       }
+      if (ttlDeletion != null) {
+        modificationList.add(ttlDeletion);
+      }
+      modificationForCurDevice.add(
+          modificationList.isEmpty() ? Collections.emptyList() : 
modificationList);
     }
 
     ModificationUtils.modifyAlignedChunkMetaData(
@@ -624,17 +616,27 @@ public class MultiTsFileDeviceIterator implements 
AutoCloseable {
       if (seriesInThisIteration.isEmpty()) {
         return new LinkedList<>();
       }
+      IDeviceID device = currentDevice.getLeft();
       currentCompactingSeries = seriesInThisIteration.removeFirst();
 
       LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>>
           readerAndChunkMetadataForThisSeries = new LinkedList<>();
-      PartialPath path =
-          CompactionPathUtils.getPath(currentDevice.getLeft(), 
currentCompactingSeries);
+      PartialPath path = CompactionPathUtils.getPath(device, 
currentCompactingSeries);
 
       for (TsFileResource resource : tsFileResourcesSortedByAsc) {
         TsFileSequenceReader reader = readerMap.get(resource);
         Map<String, List<ChunkMetadata>> chunkMetadataListMap = 
chunkMetadataCacheMap.get(reader);
 
+        Deletion ttlDeletion = null;
+        if (resource.getStartTime(device) < timeLowerBoundForCurrentDevice) {
+          ttlDeletion =
+              new Deletion(
+                  CompactionPathUtils.getPath(device, 
IoTDBConstant.ONE_LEVEL_PATH_WILDCARD),
+                  Long.MAX_VALUE,
+                  Long.MIN_VALUE,
+                  timeLowerBoundForCurrentDevice);
+        }
+
         if (chunkMetadataListMap.containsKey(currentCompactingSeries)) {
           // get the chunk metadata list and modification list of current 
series in this tsfile
           List<ChunkMetadata> chunkMetadataListInThisResource =
@@ -644,32 +646,7 @@ public class MultiTsFileDeviceIterator implements 
AutoCloseable {
           List<Modification> modificationsInThisResource =
               modificationCache.computeIfAbsent(
                   resource,
-                  r -> {
-                    List<Modification> list =
-                        new 
LinkedList<>(ModificationFile.getNormalMods(r).getModifications());
-                    // add outdated device mods by ttl
-                    try {
-                      for (IDeviceID device : r.getDevices()) {
-                        // TODO: remove deviceId conversion
-                        long timeLowerBound =
-                            CommonDateTimeUtils.currentTime()
-                                - DataNodeTTLCache.getInstance()
-                                    .getTTL(((PlainDeviceID) 
device).toStringID());
-                        if (r.getStartTime(device) < timeLowerBound) {
-                          list.add(
-                              new Deletion(
-                                  CompactionPathUtils.getPath(device)
-                                      
.concatNode(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD),
-                                  Long.MAX_VALUE,
-                                  Long.MIN_VALUE,
-                                  timeLowerBound));
-                        }
-                      }
-                    } catch (IllegalPathException e) {
-                      throw new RuntimeException(e);
-                    }
-                    return list;
-                  });
+                  r -> new 
LinkedList<>(ModificationFile.getNormalMods(r).getModifications()));
           LinkedList<Modification> modificationForCurrentSeries = new 
LinkedList<>();
           // collect the modifications for current series
           for (Modification modification : modificationsInThisResource) {
@@ -677,6 +654,10 @@ public class MultiTsFileDeviceIterator implements 
AutoCloseable {
               modificationForCurrentSeries.add(modification);
             }
           }
+          // add ttl deletion for current series
+          if (ttlDeletion != null) {
+            modificationForCurrentSeries.add(ttlDeletion);
+          }
 
           // if there are modifications of current series, apply them to the 
chunk metadata
           if (!modificationForCurrentSeries.isEmpty()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
index 59a55e716ae..7e85af0a9fe 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
@@ -19,7 +19,6 @@
 
 package 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.readchunk;
 
-import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionLastTimeCheckFailedException;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CompactionTaskSummary;
@@ -29,7 +28,6 @@ import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import org.apache.tsfile.file.header.ChunkHeader;
 import org.apache.tsfile.file.metadata.ChunkMetadata;
 import org.apache.tsfile.file.metadata.IDeviceID;
-import org.apache.tsfile.file.metadata.PlainDeviceID;
 import org.apache.tsfile.read.TimeValuePair;
 import org.apache.tsfile.read.TsFileSequenceReader;
 import org.apache.tsfile.read.common.Chunk;
@@ -49,7 +47,7 @@ import java.util.List;
 @SuppressWarnings("squid:S1319")
 public class SingleSeriesCompactionExecutor {
   private IDeviceID device;
-  private PartialPath series;
+  private String measurement;
   private LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> 
readerAndChunkMetadataList;
   private CompactionTsFileWriter fileWriter;
   private TsFileResource targetResource;
@@ -75,13 +73,13 @@ public class SingleSeriesCompactionExecutor {
       
IoTDBDescriptor.getInstance().getConfig().getChunkPointNumLowerBoundInCompaction();
 
   public SingleSeriesCompactionExecutor(
-      PartialPath series,
+      IDeviceID device,
       IMeasurementSchema measurementSchema,
       LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> 
readerAndChunkMetadataList,
       CompactionTsFileWriter fileWriter,
       TsFileResource targetResource) {
-    this.device = new PlainDeviceID(series.getDevice());
-    this.series = series;
+    this.device = device;
+    this.measurement = measurementSchema.getMeasurementId();
     this.readerAndChunkMetadataList = readerAndChunkMetadataList;
     this.fileWriter = fileWriter;
     this.schema = measurementSchema;
@@ -93,13 +91,14 @@ public class SingleSeriesCompactionExecutor {
   }
 
   public SingleSeriesCompactionExecutor(
-      PartialPath series,
+      IDeviceID device,
+      String measurement,
       LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> 
readerAndChunkMetadataList,
       CompactionTsFileWriter fileWriter,
       TsFileResource targetResource,
       CompactionTaskSummary summary) {
-    this.device = new PlainDeviceID(series.getDevice());
-    this.series = series;
+    this.device = device;
+    this.measurement = measurement;
     this.readerAndChunkMetadataList = readerAndChunkMetadataList;
     this.fileWriter = fileWriter;
     this.schema = null;
@@ -167,7 +166,7 @@ public class SingleSeriesCompactionExecutor {
     ChunkHeader chunkHeader = chunk.getHeader();
     this.schema =
         new MeasurementSchema(
-            series.getMeasurement(),
+            measurement,
             chunkHeader.getDataType(),
             chunkHeader.getEncodingType(),
             chunkHeader.getCompressionType());
@@ -371,7 +370,7 @@ public class SingleSeriesCompactionExecutor {
   private void checkAndUpdatePreviousTimestamp(long currentWritingTimestamp) {
     if (currentWritingTimestamp <= lastWriteTimestamp) {
       throw new CompactionLastTimeCheckFailedException(
-          series.getFullPath(), currentWritingTimestamp, lastWriteTimestamp);
+          device, measurement, currentWritingTimestamp, lastWriteTimestamp);
     } else {
       lastWriteTimestamp = currentWritingTimestamp;
     }

Reply via email to