This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9c085d13af2 Decrease TTL Deletion in compaction modification cache
(#12687)
9c085d13af2 is described below
commit 9c085d13af2780488a8bef9132e8eab321173f45
Author: shuwenwei <[email protected]>
AuthorDate: Fri Jun 14 16:17:43 2024 +0800
Decrease TTL Deletion in compaction modification cache (#12687)
* decrease TTL Deletion in compaction modification cache
* Update
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
Co-authored-by: Jiang Tian <[email protected]>
* modify MultiTsFileDeviceIterator
* fix spotless
---------
Co-authored-by: Jiang Tian <[email protected]>
---
.../CompactionLastTimeCheckFailedException.java | 17 +++
.../impl/ReadChunkCompactionPerformer.java | 7 +-
.../execute/utils/CompactionPathUtils.java | 11 +-
.../execute/utils/MultiTsFileDeviceIterator.java | 123 +++++++++------------
.../readchunk/SingleSeriesCompactionExecutor.java | 21 ++--
5 files changed, 83 insertions(+), 96 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/CompactionLastTimeCheckFailedException.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/CompactionLastTimeCheckFailedException.java
index 6ffcfad8cab..26458513ba8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/CompactionLastTimeCheckFailedException.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/CompactionLastTimeCheckFailedException.java
@@ -19,6 +19,10 @@
package
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+
+import org.apache.tsfile.file.metadata.IDeviceID;
+
public class CompactionLastTimeCheckFailedException extends RuntimeException {
public CompactionLastTimeCheckFailedException(
@@ -32,6 +36,19 @@ public class CompactionLastTimeCheckFailedException extends
RuntimeException {
+ lastTimestamp);
}
+ public CompactionLastTimeCheckFailedException(
+ IDeviceID device, String measurement, long currentTimestamp, long
lastTimestamp) {
+ super(
+ "Timestamp of the current point of "
+ + device
+ + IoTDBConstant.PATH_SEPARATOR
+ + measurement
+ + " is "
+ + currentTimestamp
+ + ", which should be later than the last time "
+ + lastTimestamp);
+ }
+
@Override
public Throwable fillInStackTrace() {
return this;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
index 67b98f175f8..51f6a7db434 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
@@ -21,13 +21,11 @@ package
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performe
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
-import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StorageEngineException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionTargetFileCountExceededException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ISeqCompactionPerformer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CompactionTaskSummary;
-import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionPathUtils;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.MultiTsFileDeviceIterator;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.readchunk.ReadChunkAlignedSeriesCompactionExecutor;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.readchunk.SingleSeriesCompactionExecutor;
@@ -182,9 +180,8 @@ public class ReadChunkCompactionPerformer implements
ISeqCompactionPerformer {
deviceIterator.iterateNotAlignedSeriesAndChunkMetadataListOfCurrentDevice();
while (seriesIterator.hasNextSeries()) {
checkThreadInterrupted();
- String series = seriesIterator.nextSeries();
+ String measurement = seriesIterator.nextSeries();
// TODO: we can provide a configuration item to enable concurrent
between each series
- PartialPath path = CompactionPathUtils.getPath(device, series);
LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>>
readerAndChunkMetadataList =
seriesIterator.getMetadataListForCurrentSeries();
// remove the chunk metadata whose data type not match the data type of
last chunk
@@ -192,7 +189,7 @@ public class ReadChunkCompactionPerformer implements
ISeqCompactionPerformer {
filterDataTypeNotMatchedChunkMetadata(readerAndChunkMetadataList);
SingleSeriesCompactionExecutor compactionExecutorOfCurrentTimeSeries =
new SingleSeriesCompactionExecutor(
- path, readerAndChunkMetadataList, writer, targetResource,
summary);
+ device, measurement, readerAndChunkMetadataList, writer,
targetResource, summary);
compactionExecutorOfCurrentTimeSeries.execute();
}
writer.endChunkGroup();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionPathUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionPathUtils.java
index 20de56d10fd..2cd4cf722c6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionPathUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionPathUtils.java
@@ -35,14 +35,7 @@ public class CompactionPathUtils {
public static PartialPath getPath(IDeviceID device, String measurement)
throws IllegalPathException {
- PartialPath path;
- String plainDeviceId = ((PlainDeviceID) device).toStringID();
- if (plainDeviceId.contains(TsFileConstant.BACK_QUOTE_STRING)) {
- path =
DataNodeDevicePathCache.getInstance().getPartialPath(plainDeviceId);
- } else {
- path = new PartialPath(((PlainDeviceID)
device).toStringID().split(PATH_SEPARATER_NO_REGEX));
- }
- return path.concatNode(measurement);
+ return getPath(device).concatNode(measurement);
}
public static PartialPath getPath(IDeviceID device) throws
IllegalPathException {
@@ -51,7 +44,7 @@ public class CompactionPathUtils {
if (plainDeviceId.contains(TsFileConstant.BACK_QUOTE_STRING)) {
path =
DataNodeDevicePathCache.getInstance().getPartialPath(plainDeviceId);
} else {
- path = new PartialPath(((PlainDeviceID)
device).toStringID().split(PATH_SEPARATER_NO_REGEX));
+ path = new PartialPath(plainDeviceId.split(PATH_SEPARATER_NO_REGEX));
}
return path;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
index aa53aed93cc..4836119ec05 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
@@ -70,6 +70,7 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
private final Map<TsFileResource, TsFileDeviceIterator> deviceIteratorMap =
new HashMap<>();
private final Map<TsFileResource, List<Modification>> modificationCache =
new HashMap<>();
private Pair<IDeviceID, Boolean> currentDevice = null;
+ private long timeLowerBoundForCurrentDevice;
/**
* Used for compaction with read chunk performer.
@@ -174,7 +175,7 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
* @return Pair of device full path and whether this device is aligned
*/
@SuppressWarnings("squid:S135")
- public Pair<IDeviceID, Boolean> nextDevice() {
+ public Pair<IDeviceID, Boolean> nextDevice() throws IllegalPathException {
List<TsFileResource> toBeRemovedResources = new LinkedList<>();
Pair<IDeviceID, Boolean> minDevice = null;
// get the device from source files sorted from the newest to the oldest
by version
@@ -205,6 +206,11 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
for (TsFileResource resource : toBeRemovedResources) {
deviceIteratorMap.remove(resource);
}
+
+ timeLowerBoundForCurrentDevice =
+ CommonDateTimeUtils.currentTime()
+ - DataNodeTTLCache.getInstance()
+ .getTTL(((PlainDeviceID)
currentDevice.getLeft()).toStringID());
return currentDevice;
}
@@ -398,59 +404,45 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
// all the value chunks is empty chunk
return;
}
+ IDeviceID device = currentDevice.getLeft();
+ Deletion ttlDeletion = null;
+ if (tsFileResource.getStartTime(device) < timeLowerBoundForCurrentDevice) {
+ ttlDeletion =
+ new Deletion(
+ CompactionPathUtils.getPath(device,
IoTDBConstant.ONE_LEVEL_PATH_WILDCARD),
+ Long.MAX_VALUE,
+ Long.MIN_VALUE,
+ timeLowerBoundForCurrentDevice);
+ }
List<Modification> modifications =
modificationCache.computeIfAbsent(
tsFileResource,
- r -> {
- List<Modification> list =
- new
LinkedList<>(ModificationFile.getNormalMods(r).getModifications());
- // add outdated device mods by ttl
- try {
- for (IDeviceID device : r.getDevices()) {
- // TODO: remove deviceId conversion
- long timeLowerBound =
- CommonDateTimeUtils.currentTime()
- - DataNodeTTLCache.getInstance()
- .getTTL(((PlainDeviceID) device).toStringID());
- if (r.getStartTime(device) < timeLowerBound) {
- list.add(
- new Deletion(
- CompactionPathUtils.getPath(device)
-
.concatNode(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD),
- Long.MAX_VALUE,
- Long.MIN_VALUE,
- timeLowerBound));
- }
- }
- } catch (IllegalPathException e) {
- throw new RuntimeException(e);
- }
- return list;
- });
+ r -> new
LinkedList<>(ModificationFile.getNormalMods(r).getModifications()));
// construct the input params List<List<Modification>> for
QueryUtils.modifyAlignedChunkMetaData
AlignedChunkMetadata alignedChunkMetadata =
alignedChunkMetadataList.get(0);
List<IChunkMetadata> valueChunkMetadataList =
alignedChunkMetadata.getValueChunkMetadataList();
List<List<Modification>> modificationForCurDevice = new ArrayList<>();
- List<PartialPath> valueSeriesPaths = new
ArrayList<>(valueChunkMetadataList.size());
- for (int i = 0; i < valueChunkMetadataList.size(); ++i) {
- modificationForCurDevice.add(new ArrayList<>());
- IChunkMetadata valueChunkMetadata = valueChunkMetadataList.get(i);
- valueSeriesPaths.add(
- valueChunkMetadata == null
- ? null
- : CompactionPathUtils.getPath(
- currentDevice.left, valueChunkMetadata.getMeasurementUid()));
- }
-
- for (Modification modification : modifications) {
- for (int i = 0; i < valueChunkMetadataList.size(); ++i) {
- PartialPath path = valueSeriesPaths.get(i);
- if (path != null && modification.getPath().matchFullPath(path)) {
- modificationForCurDevice.get(i).add(modification);
+ for (IChunkMetadata valueChunkMetadata : valueChunkMetadataList) {
+ if (valueChunkMetadata == null) {
+ modificationForCurDevice.add(Collections.emptyList());
+ continue;
+ }
+ List<Modification> modificationList = new ArrayList<>();
+ PartialPath path =
+ CompactionPathUtils.getPath(
+ currentDevice.getLeft(), valueChunkMetadata.getMeasurementUid());
+ for (Modification modification : modifications) {
+ if (modification.getPath().matchFullPath(path)) {
+ modificationList.add(modification);
}
}
+ if (ttlDeletion != null) {
+ modificationList.add(ttlDeletion);
+ }
+ modificationForCurDevice.add(
+ modificationList.isEmpty() ? Collections.emptyList() :
modificationList);
}
ModificationUtils.modifyAlignedChunkMetaData(
@@ -624,17 +616,27 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
if (seriesInThisIteration.isEmpty()) {
return new LinkedList<>();
}
+ IDeviceID device = currentDevice.getLeft();
currentCompactingSeries = seriesInThisIteration.removeFirst();
LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>>
readerAndChunkMetadataForThisSeries = new LinkedList<>();
- PartialPath path =
- CompactionPathUtils.getPath(currentDevice.getLeft(),
currentCompactingSeries);
+ PartialPath path = CompactionPathUtils.getPath(device,
currentCompactingSeries);
for (TsFileResource resource : tsFileResourcesSortedByAsc) {
TsFileSequenceReader reader = readerMap.get(resource);
Map<String, List<ChunkMetadata>> chunkMetadataListMap =
chunkMetadataCacheMap.get(reader);
+ Deletion ttlDeletion = null;
+ if (resource.getStartTime(device) < timeLowerBoundForCurrentDevice) {
+ ttlDeletion =
+ new Deletion(
+ CompactionPathUtils.getPath(device,
IoTDBConstant.ONE_LEVEL_PATH_WILDCARD),
+ Long.MAX_VALUE,
+ Long.MIN_VALUE,
+ timeLowerBoundForCurrentDevice);
+ }
+
if (chunkMetadataListMap.containsKey(currentCompactingSeries)) {
// get the chunk metadata list and modification list of current
series in this tsfile
List<ChunkMetadata> chunkMetadataListInThisResource =
@@ -644,32 +646,7 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
List<Modification> modificationsInThisResource =
modificationCache.computeIfAbsent(
resource,
- r -> {
- List<Modification> list =
- new
LinkedList<>(ModificationFile.getNormalMods(r).getModifications());
- // add outdated device mods by ttl
- try {
- for (IDeviceID device : r.getDevices()) {
- // TODO: remove deviceId conversion
- long timeLowerBound =
- CommonDateTimeUtils.currentTime()
- - DataNodeTTLCache.getInstance()
- .getTTL(((PlainDeviceID)
device).toStringID());
- if (r.getStartTime(device) < timeLowerBound) {
- list.add(
- new Deletion(
- CompactionPathUtils.getPath(device)
-
.concatNode(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD),
- Long.MAX_VALUE,
- Long.MIN_VALUE,
- timeLowerBound));
- }
- }
- } catch (IllegalPathException e) {
- throw new RuntimeException(e);
- }
- return list;
- });
+ r -> new
LinkedList<>(ModificationFile.getNormalMods(r).getModifications()));
LinkedList<Modification> modificationForCurrentSeries = new
LinkedList<>();
// collect the modifications for current series
for (Modification modification : modificationsInThisResource) {
@@ -677,6 +654,10 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
modificationForCurrentSeries.add(modification);
}
}
+ // add ttl deletion for current series
+ if (ttlDeletion != null) {
+ modificationForCurrentSeries.add(ttlDeletion);
+ }
// if there are modifications of current series, apply them to the
chunk metadata
if (!modificationForCurrentSeries.isEmpty()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
index 59a55e716ae..7e85af0a9fe 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
@@ -19,7 +19,6 @@
package
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.readchunk;
-import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionLastTimeCheckFailedException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CompactionTaskSummary;
@@ -29,7 +28,6 @@ import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.tsfile.file.header.ChunkHeader;
import org.apache.tsfile.file.metadata.ChunkMetadata;
import org.apache.tsfile.file.metadata.IDeviceID;
-import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.read.TimeValuePair;
import org.apache.tsfile.read.TsFileSequenceReader;
import org.apache.tsfile.read.common.Chunk;
@@ -49,7 +47,7 @@ import java.util.List;
@SuppressWarnings("squid:S1319")
public class SingleSeriesCompactionExecutor {
private IDeviceID device;
- private PartialPath series;
+ private String measurement;
private LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>>
readerAndChunkMetadataList;
private CompactionTsFileWriter fileWriter;
private TsFileResource targetResource;
@@ -75,13 +73,13 @@ public class SingleSeriesCompactionExecutor {
IoTDBDescriptor.getInstance().getConfig().getChunkPointNumLowerBoundInCompaction();
public SingleSeriesCompactionExecutor(
- PartialPath series,
+ IDeviceID device,
IMeasurementSchema measurementSchema,
LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>>
readerAndChunkMetadataList,
CompactionTsFileWriter fileWriter,
TsFileResource targetResource) {
- this.device = new PlainDeviceID(series.getDevice());
- this.series = series;
+ this.device = device;
+ this.measurement = measurementSchema.getMeasurementId();
this.readerAndChunkMetadataList = readerAndChunkMetadataList;
this.fileWriter = fileWriter;
this.schema = measurementSchema;
@@ -93,13 +91,14 @@ public class SingleSeriesCompactionExecutor {
}
public SingleSeriesCompactionExecutor(
- PartialPath series,
+ IDeviceID device,
+ String measurement,
LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>>
readerAndChunkMetadataList,
CompactionTsFileWriter fileWriter,
TsFileResource targetResource,
CompactionTaskSummary summary) {
- this.device = new PlainDeviceID(series.getDevice());
- this.series = series;
+ this.device = device;
+ this.measurement = measurement;
this.readerAndChunkMetadataList = readerAndChunkMetadataList;
this.fileWriter = fileWriter;
this.schema = null;
@@ -167,7 +166,7 @@ public class SingleSeriesCompactionExecutor {
ChunkHeader chunkHeader = chunk.getHeader();
this.schema =
new MeasurementSchema(
- series.getMeasurement(),
+ measurement,
chunkHeader.getDataType(),
chunkHeader.getEncodingType(),
chunkHeader.getCompressionType());
@@ -371,7 +370,7 @@ public class SingleSeriesCompactionExecutor {
private void checkAndUpdatePreviousTimestamp(long currentWritingTimestamp) {
if (currentWritingTimestamp <= lastWriteTimestamp) {
throw new CompactionLastTimeCheckFailedException(
- series.getFullPath(), currentWritingTimestamp, lastWriteTimestamp);
+ device, measurement, currentWritingTimestamp, lastWriteTimestamp);
} else {
lastWriteTimestamp = currentWritingTimestamp;
}