This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c4f8ee9a168 Fix the issue that the cache key used for queries may
conflict. (#14560)
c4f8ee9a168 is described below
commit c4f8ee9a1689978d9127d1c1e0914705603768ec
Author: shuwenwei <[email protected]>
AuthorDate: Fri Dec 27 12:35:03 2024 +0800
Fix the issue that the cache key used for queries may conflict. (#14560)
* use TsFileID as cache key
* add ut
* add ()
---
.../db/storageengine/buffer/BloomFilterCache.java | 28 +++--------
.../iotdb/db/storageengine/buffer/ChunkCache.java | 27 ++++-------
.../buffer/TimeSeriesMetadataCache.java | 56 ++++------------------
.../execute/task/InnerSpaceCompactionTask.java | 9 ++--
.../storageengine/buffer/BloomFilterCacheTest.java | 29 ++---------
.../repair/RepairUnsortedFileCompactionTest.java | 53 ++++++++++++++++++++
.../compaction/utils/CompactionCheckerUtils.java | 30 ++++++++----
7 files changed, 108 insertions(+), 124 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/BloomFilterCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/BloomFilterCache.java
index 5166015d1bc..e6407c6606c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/BloomFilterCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/BloomFilterCache.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import
org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
@@ -155,23 +156,11 @@ public class BloomFilterCache {
// because filePath is get from TsFileResource, different
BloomFilterCacheKey of the same file
// share this String.
private final String filePath;
- private final int regionId;
- private final long timePartitionId;
- private final long tsFileVersion;
- // high 32 bit is compaction level, low 32 bit is merge count
- private final long compactionVersion;
-
- public BloomFilterCacheKey(
- String filePath,
- int regionId,
- long timePartitionId,
- long tsFileVersion,
- long compactionVersion) {
+ private final TsFileID tsFileID;
+
+ public BloomFilterCacheKey(String filePath, TsFileID tsFileID) {
this.filePath = filePath;
- this.regionId = regionId;
- this.timePartitionId = timePartitionId;
- this.tsFileVersion = tsFileVersion;
- this.compactionVersion = compactionVersion;
+ this.tsFileID = tsFileID;
}
@Override
@@ -183,15 +172,12 @@ public class BloomFilterCache {
return false;
}
BloomFilterCacheKey that = (BloomFilterCacheKey) o;
- return regionId == that.regionId
- && timePartitionId == that.timePartitionId
- && tsFileVersion == that.tsFileVersion
- && compactionVersion == that.compactionVersion;
+ return Objects.equals(tsFileID, that.tsFileID);
}
@Override
public int hashCode() {
- return Objects.hash(regionId, timePartitionId, tsFileVersion,
compactionVersion);
+ return Objects.hash(tsFileID);
}
public long getRetainedSizeInBytes() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/ChunkCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/ChunkCache.java
index a604409035f..4044b5db87c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/ChunkCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/ChunkCache.java
@@ -218,11 +218,7 @@ public class ChunkCache {
// because filePath is get from TsFileResource, different ChunkCacheKey of
the same file
// share this String.
private final String filePath;
- private final int regionId;
- private final long timePartitionId;
- private final long tsFileVersion;
- // high 32 bit is compaction level, low 32 bit is merge count
- private final long compactionVersion;
+ private final TsFileID tsFileID;
private final long offsetOfChunkHeader;
@@ -233,10 +229,7 @@ public class ChunkCache {
public ChunkCacheKey(
String filePath, TsFileID tsfileId, long offsetOfChunkHeader, boolean
closed) {
this.filePath = filePath;
- this.regionId = tsfileId.regionId;
- this.timePartitionId = tsfileId.timePartitionId;
- this.tsFileVersion = tsfileId.fileVersion;
- this.compactionVersion = tsfileId.compactionVersion;
+ this.tsFileID = tsfileId;
this.offsetOfChunkHeader = offsetOfChunkHeader;
this.closed = closed;
}
@@ -258,17 +251,13 @@ public class ChunkCache {
return false;
}
ChunkCacheKey that = (ChunkCacheKey) o;
- return regionId == that.regionId
- && timePartitionId == that.timePartitionId
- && tsFileVersion == that.tsFileVersion
- && compactionVersion == that.compactionVersion
+ return Objects.equals(tsFileID, that.tsFileID)
&& offsetOfChunkHeader == that.offsetOfChunkHeader;
}
@Override
public int hashCode() {
- return Objects.hash(
- regionId, timePartitionId, tsFileVersion, compactionVersion,
offsetOfChunkHeader);
+ return Objects.hash(tsFileID, offsetOfChunkHeader);
}
@Override
@@ -278,13 +267,13 @@ public class ChunkCache {
+ filePath
+ '\''
+ ", regionId="
- + regionId
+ + tsFileID.regionId
+ ", timePartitionId="
- + timePartitionId
+ + tsFileID.timePartitionId
+ ", tsFileVersion="
- + tsFileVersion
+ + tsFileID.fileVersion
+ ", compactionVersion="
- + compactionVersion
+ + tsFileID.compactionVersion
+ ", offsetOfChunkHeader="
+ offsetOfChunkHeader
+ '}';
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java
index dfa2e104f04..f5262ea07aa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java
@@ -167,12 +167,7 @@ public class TimeSeriesMetadataCache {
BloomFilter bloomFilter =
BloomFilterCache.getInstance()
.get(
- new BloomFilterCache.BloomFilterCacheKey(
- filePath,
- key.regionId,
- key.timePartitionId,
- key.tsFileVersion,
- key.compactionVersion),
+ new BloomFilterCache.BloomFilterCacheKey(filePath,
key.tsFileID),
debug,
bloomFilterIoSizeRecorder,
queryContext.getQueryStatistics().getLoadBloomFilterFromCacheCount()
@@ -204,12 +199,7 @@ public class TimeSeriesMetadataCache {
for (TimeseriesMetadata metadata : timeSeriesMetadataList) {
TimeSeriesMetadataCacheKey k =
new TimeSeriesMetadataCacheKey(
- key.regionId,
- key.timePartitionId,
- key.tsFileVersion,
- key.compactionVersion,
- key.device,
- metadata.getMeasurementId());
+ key.tsFileID, key.device, metadata.getMeasurementId());
if (metadata.getStatistics().getCount() != 0) {
lruCache.put(k, metadata);
}
@@ -303,34 +293,12 @@ public class TimeSeriesMetadataCache {
RamUsageEstimator.shallowSizeOfInstance(TimeSeriesMetadataCacheKey.class)
+ RamUsageEstimator.shallowSizeOfInstance(String.class);
- private final int regionId;
- private final long timePartitionId;
- private final long tsFileVersion;
- // high 32 bit is compaction level, low 32 bit is merge count
- private final long compactionVersion;
+ private final TsFileID tsFileID;
private final IDeviceID device;
private final String measurement;
public TimeSeriesMetadataCacheKey(TsFileID tsFileID, IDeviceID device,
String measurement) {
- this.regionId = tsFileID.regionId;
- this.timePartitionId = tsFileID.timePartitionId;
- this.tsFileVersion = tsFileID.fileVersion;
- this.compactionVersion = tsFileID.compactionVersion;
- this.device = device;
- this.measurement = measurement;
- }
-
- public TimeSeriesMetadataCacheKey(
- int regionId,
- long timePartitionId,
- long tsFileVersion,
- long compactionVersion,
- IDeviceID device,
- String measurement) {
- this.regionId = regionId;
- this.timePartitionId = timePartitionId;
- this.tsFileVersion = tsFileVersion;
- this.compactionVersion = compactionVersion;
+ this.tsFileID = tsFileID;
this.device = device;
this.measurement = measurement;
}
@@ -348,31 +316,27 @@ public class TimeSeriesMetadataCache {
return false;
}
TimeSeriesMetadataCacheKey that = (TimeSeriesMetadataCacheKey) o;
- return regionId == that.regionId
- && timePartitionId == that.timePartitionId
- && tsFileVersion == that.tsFileVersion
- && compactionVersion == that.compactionVersion
+ return Objects.equals(tsFileID, that.tsFileID)
&& Objects.equals(device, that.device)
&& Objects.equals(measurement, that.measurement);
}
@Override
public int hashCode() {
- return Objects.hash(
- regionId, timePartitionId, tsFileVersion, compactionVersion, device,
measurement);
+ return Objects.hash(tsFileID, device, measurement);
}
@Override
public String toString() {
return "TimeSeriesMetadataCacheKey{"
+ "regionId="
- + regionId
+ + tsFileID.regionId
+ ", timePartitionId="
- + timePartitionId
+ + tsFileID.timePartitionId
+ ", tsFileVersion="
- + tsFileVersion
+ + tsFileID.fileVersion
+ ", compactionVersion="
- + compactionVersion
+ + tsFileID.compactionVersion
+ ", device='"
+ device
+ '\''
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
index d66a8606d87..ea95ae82209 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
@@ -239,10 +239,11 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
if (!tsFileManager.isAllowCompaction()) {
return true;
}
- if ((filesView.sequence
- &&
!IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction())
- || (!filesView.sequence
- &&
!IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction())) {
+ if ((this.getCompactionTaskType() != CompactionTaskType.REPAIR)
+ && ((filesView.sequence
+ &&
!IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction())
+ || (!filesView.sequence
+ &&
!IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction()))) {
return true;
}
if (this.compactionConfigVersion
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/buffer/BloomFilterCacheTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/buffer/BloomFilterCacheTest.java
index a6c1fd0afde..83629b5eaba 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/buffer/BloomFilterCacheTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/buffer/BloomFilterCacheTest.java
@@ -96,13 +96,7 @@ public class BloomFilterCacheTest {
for (String filePath : pathList) {
TsFileID tsFileID = new TsFileID(filePath);
BloomFilter bloomFilter =
- bloomFilterCache.get(
- new BloomFilterCache.BloomFilterCacheKey(
- filePath,
- tsFileID.regionId,
- tsFileID.timePartitionId,
- tsFileID.fileVersion,
- tsFileID.compactionVersion));
+ bloomFilterCache.get(new
BloomFilterCache.BloomFilterCacheKey(filePath, tsFileID));
TsFileSequenceReader reader =
FileReaderManager.getInstance().get(filePath, true);
BloomFilter bloomFilter1 = reader.readBloomFilter();
Assert.assertEquals(bloomFilter1, bloomFilter);
@@ -120,12 +114,7 @@ public class BloomFilterCacheTest {
String path = pathList.get(0);
TsFileID tsFileID = new TsFileID(path);
BloomFilterCache.BloomFilterCacheKey key =
- new BloomFilterCache.BloomFilterCacheKey(
- path,
- tsFileID.regionId,
- tsFileID.timePartitionId,
- tsFileID.fileVersion,
- tsFileID.compactionVersion);
+ new BloomFilterCache.BloomFilterCacheKey(path, tsFileID);
BloomFilter bloomFilter = bloomFilterCache.get(key);
TsFileSequenceReader reader = FileReaderManager.getInstance().get(path,
true);
BloomFilter bloomFilter1 = reader.readBloomFilter();
@@ -146,12 +135,7 @@ public class BloomFilterCacheTest {
for (String path : pathList) {
TsFileID tsFileID = new TsFileID(path);
BloomFilterCache.BloomFilterCacheKey key =
- new BloomFilterCache.BloomFilterCacheKey(
- path,
- tsFileID.regionId,
- tsFileID.timePartitionId,
- tsFileID.fileVersion,
- tsFileID.compactionVersion);
+ new BloomFilterCache.BloomFilterCacheKey(path, tsFileID);
BloomFilter bloomFilter = bloomFilterCache.get(key);
TsFileSequenceReader reader =
FileReaderManager.getInstance().get(path, true);
BloomFilter bloomFilter1 = reader.readBloomFilter();
@@ -162,12 +146,7 @@ public class BloomFilterCacheTest {
for (String path : pathList) {
TsFileID tsFileID = new TsFileID(path);
BloomFilterCache.BloomFilterCacheKey key =
- new BloomFilterCache.BloomFilterCacheKey(
- path,
- tsFileID.regionId,
- tsFileID.timePartitionId,
- tsFileID.fileVersion,
- tsFileID.compactionVersion);
+ new BloomFilterCache.BloomFilterCacheKey(path, tsFileID);
BloomFilter bloomFilter = bloomFilterCache.getIfPresent(key);
Assert.assertNull(bloomFilter);
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileCompactionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileCompactionTest.java
index e088dc260b1..a9c559a00a5 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileCompactionTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileCompactionTest.java
@@ -21,6 +21,7 @@ package
org.apache.iotdb.db.storageengine.dataregion.compaction.repair;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.IFullPath;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StorageEngineException;
@@ -33,6 +34,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.Repa
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleContext;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduler;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionCheckerUtils;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
@@ -68,6 +70,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
public class RepairUnsortedFileCompactionTest extends AbstractRepairDataTest {
@@ -841,4 +844,54 @@ public class RepairUnsortedFileCompactionTest extends
AbstractRepairDataTest {
Assert.assertEquals(Arrays.asList(d1, d2), devicesInTargetFile);
}
}
+
+ @Test
+ public void testQueryRepairResult() throws IOException, IllegalPathException
{
+ TsFileResource resource1 = createEmptyFileAndResource(true);
+ try (CompactionTestFileWriter writer = new
CompactionTestFileWriter(resource1)) {
+ writer.startChunkGroup("d1");
+ writer.generateSimpleNonAlignedSeriesToCurrentDevice(
+ "s1",
+ new TimeRange[][] {new TimeRange[] {new TimeRange(10, 20), new
TimeRange(5, 30)}},
+ TSEncoding.PLAIN,
+ CompressionType.LZ4);
+ writer.endChunkGroup();
+ writer.endFile();
+ }
+
resource1.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE);
+
+ TsFileResource resource2 = createEmptyFileAndResource(true);
+ try (CompactionTestFileWriter writer = new
CompactionTestFileWriter(resource2)) {
+ writer.startChunkGroup("d2");
+ writer.generateSimpleNonAlignedSeriesToCurrentDevice(
+ "s1",
+ new TimeRange[][] {new TimeRange[] {new TimeRange(10, 20), new
TimeRange(5, 30)}},
+ TSEncoding.PLAIN,
+ CompressionType.LZ4);
+ writer.endChunkGroup();
+ writer.endFile();
+ }
+
resource2.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE);
+
+ tsFileManager.add(resource1, true);
+ tsFileManager.add(resource2, true);
+
+ RepairUnsortedFileCompactionTask task =
+ new RepairUnsortedFileCompactionTask(0, tsFileManager, resource1,
true, 0);
+ Assert.assertTrue(task.start());
+
+ task = new RepairUnsortedFileCompactionTask(0, tsFileManager, resource2,
true, 0);
+ Assert.assertTrue(task.start());
+
+ List<TsFileResource> sourceTsFileResources =
tsFileManager.getTsFileList(false);
+ List<IFullPath> fullPaths = getPaths(sourceTsFileResources);
+ Map<IFullPath, List<TimeValuePair>> dataByQuery1 =
+ CompactionCheckerUtils.getDataByQuery(
+ fullPaths, sourceTsFileResources, Collections.emptyList(), true);
+ Map<IFullPath, List<TimeValuePair>> dataByQuery2 =
+ CompactionCheckerUtils.getDataByQuery(
+ fullPaths, sourceTsFileResources, Collections.emptyList(), false);
+ Assert.assertTrue(
+ CompactionCheckerUtils.compareSourceDataAndTargetData(dataByQuery1,
dataByQuery2));
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionCheckerUtils.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionCheckerUtils.java
index 77e8498c858..05c9b11f5b4 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionCheckerUtils.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionCheckerUtils.java
@@ -600,6 +600,14 @@ public class CompactionCheckerUtils {
return true;
}
+ public static Map<IFullPath, List<TimeValuePair>> getDataByQuery(
+ List<IFullPath> fullPaths,
+ List<TsFileResource> sequenceResources,
+ List<TsFileResource> unsequenceResources)
+ throws IllegalPathException, IOException {
+ return getDataByQuery(fullPaths, sequenceResources, unsequenceResources,
false);
+ }
+
/**
* Using SeriesRawDataBatchReader to read raw data from files, and return it
as a map.
*
@@ -612,15 +620,13 @@ public class CompactionCheckerUtils {
public static Map<IFullPath, List<TimeValuePair>> getDataByQuery(
List<IFullPath> fullPaths,
List<TsFileResource> sequenceResources,
- List<TsFileResource> unsequenceResources)
+ List<TsFileResource> unsequenceResources,
+ boolean clearCacheDuringQuery)
throws IllegalPathException, IOException {
Map<IFullPath, List<TimeValuePair>> pathDataMap = new HashMap<>();
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
+ clearCache();
for (int i = 0; i < fullPaths.size(); ++i) {
- FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
- TimeSeriesMetadataCache.getInstance().clear();
- ChunkCache.getInstance().clear();
- BloomFilterCache.getInstance().clear();
-
IFullPath path = fullPaths.get(i);
List<TimeValuePair> dataList = new ArrayList<>();
@@ -645,12 +651,18 @@ public class CompactionCheckerUtils {
}
pathDataMap.put(fullPaths.get(i), dataList);
- TimeSeriesMetadataCache.getInstance().clear();
- ChunkCache.getInstance().clear();
+ if (clearCacheDuringQuery) {
+ clearCache();
+ }
}
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
+ return pathDataMap;
+ }
+
+ private static void clearCache() {
+ BloomFilterCache.getInstance().clear();
TimeSeriesMetadataCache.getInstance().clear();
ChunkCache.getInstance().clear();
- return pathDataMap;
}
public static void validDataByValueList(