This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 3f980257195 [to dev/1.3] Fix the issue that the cache key used for
queries may conflict (#14561)
3f980257195 is described below
commit 3f9802571952b2042393717c3ebafce942e64f92
Author: shuwenwei <[email protected]>
AuthorDate: Fri Dec 27 12:34:38 2024 +0800
[to dev/1.3] Fix the issue that the cache key used for queries may conflict
(#14561)
* use TsFileID as cache key
* add ut
* add ()
---
.../db/storageengine/buffer/BloomFilterCache.java | 28 +++--------
.../iotdb/db/storageengine/buffer/ChunkCache.java | 27 ++++-------
.../buffer/TimeSeriesMetadataCache.java | 56 ++++------------------
.../execute/task/InnerSpaceCompactionTask.java | 9 ++--
.../storageengine/buffer/BloomFilterCacheTest.java | 29 ++---------
.../repair/RepairUnsortedFileCompactionTest.java | 52 ++++++++++++++++++++
.../compaction/utils/CompactionCheckerUtils.java | 32 +++++++++----
7 files changed, 108 insertions(+), 125 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/BloomFilterCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/BloomFilterCache.java
index 5166015d1bc..e6407c6606c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/BloomFilterCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/BloomFilterCache.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import
org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
@@ -155,23 +156,11 @@ public class BloomFilterCache {
// because filePath is get from TsFileResource, different
BloomFilterCacheKey of the same file
// share this String.
private final String filePath;
- private final int regionId;
- private final long timePartitionId;
- private final long tsFileVersion;
- // high 32 bit is compaction level, low 32 bit is merge count
- private final long compactionVersion;
-
- public BloomFilterCacheKey(
- String filePath,
- int regionId,
- long timePartitionId,
- long tsFileVersion,
- long compactionVersion) {
+ private final TsFileID tsFileID;
+
+ public BloomFilterCacheKey(String filePath, TsFileID tsFileID) {
this.filePath = filePath;
- this.regionId = regionId;
- this.timePartitionId = timePartitionId;
- this.tsFileVersion = tsFileVersion;
- this.compactionVersion = compactionVersion;
+ this.tsFileID = tsFileID;
}
@Override
@@ -183,15 +172,12 @@ public class BloomFilterCache {
return false;
}
BloomFilterCacheKey that = (BloomFilterCacheKey) o;
- return regionId == that.regionId
- && timePartitionId == that.timePartitionId
- && tsFileVersion == that.tsFileVersion
- && compactionVersion == that.compactionVersion;
+ return Objects.equals(tsFileID, that.tsFileID);
}
@Override
public int hashCode() {
- return Objects.hash(regionId, timePartitionId, tsFileVersion,
compactionVersion);
+ return Objects.hash(tsFileID);
}
public long getRetainedSizeInBytes() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/ChunkCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/ChunkCache.java
index 62a0189a4b9..b87351444cc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/ChunkCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/ChunkCache.java
@@ -211,11 +211,7 @@ public class ChunkCache {
// because filePath is get from TsFileResource, different ChunkCacheKey of
the same file
// share this String.
private final String filePath;
- private final int regionId;
- private final long timePartitionId;
- private final long tsFileVersion;
- // high 32 bit is compaction level, low 32 bit is merge count
- private final long compactionVersion;
+ private final TsFileID tsFileID;
private final long offsetOfChunkHeader;
@@ -226,10 +222,7 @@ public class ChunkCache {
public ChunkCacheKey(
String filePath, TsFileID tsfileId, long offsetOfChunkHeader, boolean
closed) {
this.filePath = filePath;
- this.regionId = tsfileId.regionId;
- this.timePartitionId = tsfileId.timePartitionId;
- this.tsFileVersion = tsfileId.fileVersion;
- this.compactionVersion = tsfileId.compactionVersion;
+ this.tsFileID = tsfileId;
this.offsetOfChunkHeader = offsetOfChunkHeader;
this.closed = closed;
}
@@ -251,17 +244,13 @@ public class ChunkCache {
return false;
}
ChunkCacheKey that = (ChunkCacheKey) o;
- return regionId == that.regionId
- && timePartitionId == that.timePartitionId
- && tsFileVersion == that.tsFileVersion
- && compactionVersion == that.compactionVersion
+ return Objects.equals(tsFileID, that.tsFileID)
&& offsetOfChunkHeader == that.offsetOfChunkHeader;
}
@Override
public int hashCode() {
- return Objects.hash(
- regionId, timePartitionId, tsFileVersion, compactionVersion,
offsetOfChunkHeader);
+ return Objects.hash(tsFileID, offsetOfChunkHeader);
}
@Override
@@ -271,13 +260,13 @@ public class ChunkCache {
+ filePath
+ '\''
+ ", regionId="
- + regionId
+ + tsFileID.regionId
+ ", timePartitionId="
- + timePartitionId
+ + tsFileID.timePartitionId
+ ", tsFileVersion="
- + tsFileVersion
+ + tsFileID.fileVersion
+ ", compactionVersion="
- + compactionVersion
+ + tsFileID.compactionVersion
+ ", offsetOfChunkHeader="
+ offsetOfChunkHeader
+ '}';
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java
index f32d424ae0d..44324ae2226 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java
@@ -170,12 +170,7 @@ public class TimeSeriesMetadataCache {
BloomFilter bloomFilter =
BloomFilterCache.getInstance()
.get(
- new BloomFilterCache.BloomFilterCacheKey(
- filePath,
- key.regionId,
- key.timePartitionId,
- key.tsFileVersion,
- key.compactionVersion),
+ new BloomFilterCache.BloomFilterCacheKey(filePath,
key.tsFileID),
debug,
bloomFilterIoSizeRecorder,
queryContext.getQueryStatistics().getLoadBloomFilterFromCacheCount()
@@ -209,12 +204,7 @@ public class TimeSeriesMetadataCache {
for (TimeseriesMetadata metadata : timeSeriesMetadataList) {
TimeSeriesMetadataCacheKey k =
new TimeSeriesMetadataCacheKey(
- key.regionId,
- key.timePartitionId,
- key.tsFileVersion,
- key.compactionVersion,
- key.device,
- metadata.getMeasurementId());
+ key.tsFileID, key.device, metadata.getMeasurementId());
if (metadata.getStatistics().getCount() != 0) {
lruCache.put(k, metadata);
}
@@ -308,34 +298,12 @@ public class TimeSeriesMetadataCache {
RamUsageEstimator.shallowSizeOfInstance(TimeSeriesMetadataCacheKey.class)
+ RamUsageEstimator.shallowSizeOfInstance(String.class);
- private final int regionId;
- private final long timePartitionId;
- private final long tsFileVersion;
- // high 32 bit is compaction level, low 32 bit is merge count
- private final long compactionVersion;
+ private final TsFileID tsFileID;
private final IDeviceID device;
private final String measurement;
public TimeSeriesMetadataCacheKey(TsFileID tsFileID, IDeviceID device,
String measurement) {
- this.regionId = tsFileID.regionId;
- this.timePartitionId = tsFileID.timePartitionId;
- this.tsFileVersion = tsFileID.fileVersion;
- this.compactionVersion = tsFileID.compactionVersion;
- this.device = device;
- this.measurement = measurement;
- }
-
- public TimeSeriesMetadataCacheKey(
- int regionId,
- long timePartitionId,
- long tsFileVersion,
- long compactionVersion,
- IDeviceID device,
- String measurement) {
- this.regionId = regionId;
- this.timePartitionId = timePartitionId;
- this.tsFileVersion = tsFileVersion;
- this.compactionVersion = compactionVersion;
+ this.tsFileID = tsFileID;
this.device = device;
this.measurement = measurement;
}
@@ -353,31 +321,27 @@ public class TimeSeriesMetadataCache {
return false;
}
TimeSeriesMetadataCacheKey that = (TimeSeriesMetadataCacheKey) o;
- return regionId == that.regionId
- && timePartitionId == that.timePartitionId
- && tsFileVersion == that.tsFileVersion
- && compactionVersion == that.compactionVersion
+ return Objects.equals(tsFileID, that.tsFileID)
&& Objects.equals(device, that.device)
&& Objects.equals(measurement, that.measurement);
}
@Override
public int hashCode() {
- return Objects.hash(
- regionId, timePartitionId, tsFileVersion, compactionVersion, device,
measurement);
+ return Objects.hash(tsFileID, device, measurement);
}
@Override
public String toString() {
return "TimeSeriesMetadataCacheKey{"
+ "regionId="
- + regionId
+ + tsFileID.regionId
+ ", timePartitionId="
- + timePartitionId
+ + tsFileID.timePartitionId
+ ", tsFileVersion="
- + tsFileVersion
+ + tsFileID.fileVersion
+ ", compactionVersion="
- + compactionVersion
+ + tsFileID.compactionVersion
+ ", device='"
+ ((PlainDeviceID) device).toStringID()
+ '\''
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
index cb0e72c3220..e3ab9429e97 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
@@ -219,10 +219,11 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
if (!tsFileManager.isAllowCompaction()) {
return true;
}
- if ((filesView.sequence
- &&
!IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction())
- || (!filesView.sequence
- &&
!IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction())) {
+ if ((this.getCompactionTaskType() != CompactionTaskType.REPAIR)
+ && ((filesView.sequence
+ &&
!IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction())
+ || (!filesView.sequence
+ &&
!IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction()))) {
return true;
}
if (this.compactionConfigVersion
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/buffer/BloomFilterCacheTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/buffer/BloomFilterCacheTest.java
index 00744e9b09c..e6a65d5eed9 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/buffer/BloomFilterCacheTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/buffer/BloomFilterCacheTest.java
@@ -95,13 +95,7 @@ public class BloomFilterCacheTest {
for (String filePath : pathList) {
TsFileID tsFileID = new TsFileID(filePath);
BloomFilter bloomFilter =
- bloomFilterCache.get(
- new BloomFilterCache.BloomFilterCacheKey(
- filePath,
- tsFileID.regionId,
- tsFileID.timePartitionId,
- tsFileID.fileVersion,
- tsFileID.compactionVersion));
+ bloomFilterCache.get(new
BloomFilterCache.BloomFilterCacheKey(filePath, tsFileID));
TsFileSequenceReader reader =
FileReaderManager.getInstance().get(filePath, true);
BloomFilter bloomFilter1 = reader.readBloomFilter();
Assert.assertEquals(bloomFilter1, bloomFilter);
@@ -119,12 +113,7 @@ public class BloomFilterCacheTest {
String path = pathList.get(0);
TsFileID tsFileID = new TsFileID(path);
BloomFilterCache.BloomFilterCacheKey key =
- new BloomFilterCache.BloomFilterCacheKey(
- path,
- tsFileID.regionId,
- tsFileID.timePartitionId,
- tsFileID.fileVersion,
- tsFileID.compactionVersion);
+ new BloomFilterCache.BloomFilterCacheKey(path, tsFileID);
BloomFilter bloomFilter = bloomFilterCache.get(key);
TsFileSequenceReader reader = FileReaderManager.getInstance().get(path,
true);
BloomFilter bloomFilter1 = reader.readBloomFilter();
@@ -145,12 +134,7 @@ public class BloomFilterCacheTest {
for (String path : pathList) {
TsFileID tsFileID = new TsFileID(path);
BloomFilterCache.BloomFilterCacheKey key =
- new BloomFilterCache.BloomFilterCacheKey(
- path,
- tsFileID.regionId,
- tsFileID.timePartitionId,
- tsFileID.fileVersion,
- tsFileID.compactionVersion);
+ new BloomFilterCache.BloomFilterCacheKey(path, tsFileID);
BloomFilter bloomFilter = bloomFilterCache.get(key);
TsFileSequenceReader reader =
FileReaderManager.getInstance().get(path, true);
BloomFilter bloomFilter1 = reader.readBloomFilter();
@@ -161,12 +145,7 @@ public class BloomFilterCacheTest {
for (String path : pathList) {
TsFileID tsFileID = new TsFileID(path);
BloomFilterCache.BloomFilterCacheKey key =
- new BloomFilterCache.BloomFilterCacheKey(
- path,
- tsFileID.regionId,
- tsFileID.timePartitionId,
- tsFileID.fileVersion,
- tsFileID.compactionVersion);
+ new BloomFilterCache.BloomFilterCacheKey(path, tsFileID);
BloomFilter bloomFilter = bloomFilterCache.getIfPresent(key);
Assert.assertNull(bloomFilter);
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileCompactionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileCompactionTest.java
index 28f862f9371..4dd666f1d0b 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileCompactionTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileCompactionTest.java
@@ -33,6 +33,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.Repa
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleContext;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduler;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionCheckerUtils;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter;
import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
@@ -68,6 +69,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
public class RepairUnsortedFileCompactionTest extends AbstractRepairDataTest {
@@ -841,4 +843,54 @@ public class RepairUnsortedFileCompactionTest extends
AbstractRepairDataTest {
Assert.assertEquals(Arrays.asList(d1, d2), devicesInTargetFile);
}
}
+
+ @Test
+ public void testQueryRepairResult() throws IOException, IllegalPathException
{
+ TsFileResource resource1 = createEmptyFileAndResource(true);
+ try (CompactionTestFileWriter writer = new
CompactionTestFileWriter(resource1)) {
+ writer.startChunkGroup("d1");
+ writer.generateSimpleNonAlignedSeriesToCurrentDevice(
+ "s1",
+ new TimeRange[][] {new TimeRange[] {new TimeRange(10, 20), new
TimeRange(5, 30)}},
+ TSEncoding.PLAIN,
+ CompressionType.LZ4);
+ writer.endChunkGroup();
+ writer.endFile();
+ }
+
resource1.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE);
+
+ TsFileResource resource2 = createEmptyFileAndResource(true);
+ try (CompactionTestFileWriter writer = new
CompactionTestFileWriter(resource2)) {
+ writer.startChunkGroup("d2");
+ writer.generateSimpleNonAlignedSeriesToCurrentDevice(
+ "s1",
+ new TimeRange[][] {new TimeRange[] {new TimeRange(10, 20), new
TimeRange(5, 30)}},
+ TSEncoding.PLAIN,
+ CompressionType.LZ4);
+ writer.endChunkGroup();
+ writer.endFile();
+ }
+
resource2.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE);
+
+ tsFileManager.add(resource1, true);
+ tsFileManager.add(resource2, true);
+
+ RepairUnsortedFileCompactionTask task =
+ new RepairUnsortedFileCompactionTask(0, tsFileManager, resource1,
true, 0);
+ Assert.assertTrue(task.start());
+
+ task = new RepairUnsortedFileCompactionTask(0, tsFileManager, resource2,
true, 0);
+ Assert.assertTrue(task.start());
+
+ List<TsFileResource> sourceTsFileResources =
tsFileManager.getTsFileList(false);
+ List<PartialPath> fullPaths = getPaths(sourceTsFileResources);
+ Map<PartialPath, List<TimeValuePair>> dataByQuery1 =
+ CompactionCheckerUtils.getDataByQuery(
+ fullPaths, sourceTsFileResources, Collections.emptyList(), true);
+ Map<PartialPath, List<TimeValuePair>> dataByQuery2 =
+ CompactionCheckerUtils.getDataByQuery(
+ fullPaths, sourceTsFileResources, Collections.emptyList(), false);
+ Assert.assertTrue(
+ CompactionCheckerUtils.compareSourceDataAndTargetData(dataByQuery1,
dataByQuery2));
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionCheckerUtils.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionCheckerUtils.java
index 88f888a92a6..43b6842cc87 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionCheckerUtils.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionCheckerUtils.java
@@ -601,6 +601,14 @@ public class CompactionCheckerUtils {
return true;
}
+ public static Map<PartialPath, List<TimeValuePair>> getDataByQuery(
+ List<PartialPath> fullPaths,
+ List<TsFileResource> sequenceResources,
+ List<TsFileResource> unsequenceResources)
+ throws IllegalPathException, IOException {
+ return getDataByQuery(fullPaths, sequenceResources, unsequenceResources,
false);
+ }
+
/**
* Using SeriesRawDataBatchReader to read raw data from files, and return it
as a map.
*
@@ -613,15 +621,13 @@ public class CompactionCheckerUtils {
public static Map<PartialPath, List<TimeValuePair>> getDataByQuery(
List<PartialPath> fullPaths,
List<TsFileResource> sequenceResources,
- List<TsFileResource> unsequenceResources)
- throws IOException {
+ List<TsFileResource> unsequenceResources,
+ boolean clearCacheDuringQuery)
+ throws IllegalPathException, IOException {
Map<PartialPath, List<TimeValuePair>> pathDataMap = new HashMap<>();
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
+ clearCache();
for (int i = 0; i < fullPaths.size(); ++i) {
- FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
- TimeSeriesMetadataCache.getInstance().clear();
- ChunkCache.getInstance().clear();
- BloomFilterCache.getInstance().clear();
-
PartialPath path = fullPaths.get(i);
List<TimeValuePair> dataList = new ArrayList<>();
@@ -646,12 +652,18 @@ public class CompactionCheckerUtils {
}
pathDataMap.put(fullPaths.get(i), dataList);
- TimeSeriesMetadataCache.getInstance().clear();
- ChunkCache.getInstance().clear();
+ if (clearCacheDuringQuery) {
+ clearCache();
+ }
}
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
+ return pathDataMap;
+ }
+
+ private static void clearCache() {
+ BloomFilterCache.getInstance().clear();
TimeSeriesMetadataCache.getInstance().clear();
ChunkCache.getInstance().clear();
- return pathDataMap;
}
public static void validDataByValueList(