This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e879c5497e0 Reduce duplicated DeviceID in compaction selection (#16314)
e879c5497e0 is described below
commit e879c5497e00b3b872cfd5dc82477a6b785d1f5a
Author: shuwenwei <[email protected]>
AuthorDate: Tue Dec 30 12:25:08 2025 +0800
Reduce duplicated DeviceID in compaction selection (#16314)
* reduce duplicated DeviceID in compaction selection
* fix ut
---
.../compaction/execute/utils/CompactionUtils.java | 5 ++++
.../schedule/CompactionScheduleContext.java | 27 +++++++++++++++++
.../selector/utils/TsFileResourceCandidate.java | 7 ++++-
.../dataregion/tsfile/TsFileResource.java | 13 ++++++--
.../tsfile/timeindex/ArrayDeviceTimeIndex.java | 5 ++--
.../dataregion/tsfile/timeindex/FileTimeIndex.java | 3 +-
.../dataregion/tsfile/timeindex/ITimeIndex.java | 10 +++++--
.../tsfile/timeindex/PlainDeviceTimeIndex.java | 3 +-
.../compaction/CompactionSchedulerTest.java | 35 ++++++++++++++++++++++
9 files changed, 97 insertions(+), 11 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
index 1e3f494ae8c..61943047e54 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
@@ -482,6 +482,11 @@ public class CompactionUtils {
public static ArrayDeviceTimeIndex buildDeviceTimeIndex(TsFileResource
resource)
throws IOException {
+ return buildDeviceTimeIndex(resource,
IDeviceID.Deserializer.DEFAULT_DESERIALIZER);
+ }
+
+ public static ArrayDeviceTimeIndex buildDeviceTimeIndex(
+ TsFileResource resource, IDeviceID.Deserializer deserializer) throws
IOException {
long resourceFileSize =
new File(resource.getTsFilePath() +
TsFileResource.RESOURCE_SUFFIX).length();
CompactionTaskManager.getInstance().getCompactionReadOperationRateLimiter().acquire(1);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java
index 2bfc515c5dc..6f0f7872789 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java
@@ -33,7 +33,11 @@ import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ArrayDevice
import org.apache.iotdb.db.utils.EncryptDBUtils;
import org.apache.tsfile.encrypt.EncryptParameter;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -53,6 +57,7 @@ public class CompactionScheduleContext {
// end region
private final Map<TsFileResource, ArrayDeviceTimeIndex>
partitionFileDeviceInfoCache;
+ private final Map<IDeviceID, IDeviceID> deviceIdCache;
private long cachedDeviceInfoSize = 0;
private final Set<Long> timePartitionsDelayInsertionSelection;
@@ -63,12 +68,14 @@ public class CompactionScheduleContext {
public CompactionScheduleContext() {
this.partitionFileDeviceInfoCache = new HashMap<>();
this.timePartitionsDelayInsertionSelection = new HashSet<>();
+ this.deviceIdCache = new HashMap<>();
this.encryptParameter = EncryptDBUtils.getDefaultFirstEncryptParam();
}
public CompactionScheduleContext(EncryptParameter encryptParameter) {
this.partitionFileDeviceInfoCache = new HashMap<>();
this.timePartitionsDelayInsertionSelection = new HashSet<>();
+ this.deviceIdCache = new HashMap<>();
this.encryptParameter = encryptParameter;
}
@@ -95,6 +102,7 @@ public class CompactionScheduleContext {
public void clearTimePartitionDeviceInfoCache() {
partitionFileDeviceInfoCache.clear();
+ deviceIdCache.clear();
CompactionMetrics.getInstance()
.decreaseSelectionCachedDeviceTimeIndexSize(cachedDeviceInfoSize);
cachedDeviceInfoSize = 0;
@@ -209,4 +217,23 @@ public class CompactionScheduleContext {
.getCrossCompactionPerformer()
.createInstance(encryptParameter);
}
+
+ public IDeviceID.Deserializer getCachedDeviceIdDeserializer() {
+ return new CachedIDeviceIdDeserializer();
+ }
+
+ private class CachedIDeviceIdDeserializer implements IDeviceID.Deserializer {
+
+ @Override
+ public IDeviceID deserializeFrom(ByteBuffer byteBuffer) {
+ IDeviceID deviceId =
IDeviceID.Deserializer.DEFAULT_DESERIALIZER.deserializeFrom(byteBuffer);
+ return deviceIdCache.computeIfAbsent(deviceId, k -> deviceId);
+ }
+
+ @Override
+ public IDeviceID deserializeFrom(InputStream inputStream) throws
IOException {
+ IDeviceID deviceId =
IDeviceID.Deserializer.DEFAULT_DESERIALIZER.deserializeFrom(inputStream);
+ return deviceIdCache.computeIfAbsent(deviceId, k -> deviceId);
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/utils/TsFileResourceCandidate.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/utils/TsFileResourceCandidate.java
index a2b0b2d40bf..acde3c607f7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/utils/TsFileResourceCandidate.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/utils/TsFileResourceCandidate.java
@@ -100,7 +100,12 @@ public class TsFileResourceCandidate {
deviceTimeIndex = new ArrayDeviceTimeIndex();
return;
}
- deviceTimeIndex = CompactionUtils.buildDeviceTimeIndex(resource);
+ deviceTimeIndex =
+ CompactionUtils.buildDeviceTimeIndex(
+ resource,
+ compactionScheduleContext == null
+ ? IDeviceID.Deserializer.DEFAULT_DESERIALIZER
+ :
compactionScheduleContext.getCachedDeviceIdDeserializer());
} finally {
resource.readUnlock();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index 3169cf85ccb..9a827360f70 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -320,7 +320,8 @@ public class TsFileResource implements PersistentResource,
Cloneable {
try (InputStream inputStream = fsFactory.getBufferedInputStream(file +
RESOURCE_SUFFIX)) {
// The first byte is VERSION_NUMBER, second byte is timeIndexType.
ReadWriteIOUtils.readByte(inputStream);
- timeIndex = ITimeIndex.createTimeIndex(inputStream);
+ timeIndex =
+ ITimeIndex.createTimeIndex(inputStream,
IDeviceID.Deserializer.DEFAULT_DESERIALIZER);
maxPlanIndex = ReadWriteIOUtils.readLong(inputStream);
minPlanIndex = ReadWriteIOUtils.readLong(inputStream);
@@ -676,7 +677,8 @@ public class TsFileResource implements PersistentResource,
Cloneable {
return timeIndex.getDevices(file.getPath(), this);
}
- public ArrayDeviceTimeIndex buildDeviceTimeIndex() throws IOException {
+ public ArrayDeviceTimeIndex buildDeviceTimeIndex(IDeviceID.Deserializer
deserializer)
+ throws IOException {
readLock();
try {
if (!resourceFileExists()) {
@@ -686,7 +688,8 @@ public class TsFileResource implements PersistentResource,
Cloneable {
FSFactoryProducer.getFSFactory()
.getBufferedInputStream(file.getPath() + RESOURCE_SUFFIX)) {
ReadWriteIOUtils.readByte(inputStream);
- ITimeIndex timeIndexFromResourceFile =
ITimeIndex.createTimeIndex(inputStream);
+ ITimeIndex timeIndexFromResourceFile =
+ ITimeIndex.createTimeIndex(inputStream, deserializer);
if (!(timeIndexFromResourceFile instanceof ArrayDeviceTimeIndex)) {
throw new IOException("cannot build DeviceTimeIndex from resource "
+ file.getPath());
}
@@ -700,6 +703,10 @@ public class TsFileResource implements PersistentResource,
Cloneable {
}
}
+ public ArrayDeviceTimeIndex buildDeviceTimeIndex() throws IOException {
+ return buildDeviceTimeIndex(IDeviceID.Deserializer.DEFAULT_DESERIALIZER);
+ }
+
/**
* Used for compaction to verify tsfile, also used to verify TimeIndex
version when loading tsfile
*/
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java
index f8503c79af5..8499b6d6b3d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java
@@ -113,7 +113,8 @@ public class ArrayDeviceTimeIndex implements ITimeIndex {
}
@Override
- public ArrayDeviceTimeIndex deserialize(InputStream inputStream) throws
IOException {
+ public ArrayDeviceTimeIndex deserialize(
+ InputStream inputStream, IDeviceID.Deserializer deserializer) throws
IOException {
int deviceNum = ReadWriteIOUtils.readInt(inputStream);
startTimes = new long[deviceNum];
@@ -127,7 +128,7 @@ public class ArrayDeviceTimeIndex implements ITimeIndex {
}
for (int i = 0; i < deviceNum; i++) {
- IDeviceID deviceID =
Deserializer.DEFAULT_DESERIALIZER.deserializeFrom(inputStream);
+ IDeviceID deviceID = deserializer.deserializeFrom(inputStream);
int index = ReadWriteIOUtils.readInt(inputStream);
deviceToIndex.put(deviceID, index);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java
index 0e967fc622d..e4a812012a8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java
@@ -72,7 +72,8 @@ public class FileTimeIndex implements ITimeIndex {
}
@Override
- public FileTimeIndex deserialize(InputStream inputStream) throws IOException
{
+ public FileTimeIndex deserialize(InputStream inputStream,
IDeviceID.Deserializer deserializer)
+ throws IOException {
throw new UnsupportedOperationException();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java
index 214ae53750c..d705a2417d7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java
@@ -53,7 +53,8 @@ public interface ITimeIndex {
* @param inputStream inputStream
* @return TimeIndex
*/
- ITimeIndex deserialize(InputStream inputStream) throws IOException;
+ ITimeIndex deserialize(InputStream inputStream, IDeviceID.Deserializer
deserializer)
+ throws IOException;
/**
* deserialize from byte buffer
@@ -218,11 +219,14 @@ public interface ITimeIndex {
*/
byte getTimeIndexType();
- static ITimeIndex createTimeIndex(InputStream inputStream) throws
IOException {
+ static ITimeIndex createTimeIndex(InputStream inputStream,
IDeviceID.Deserializer deserializer)
+ throws IOException {
byte timeIndexType = ReadWriteIOUtils.readByte(inputStream);
if (timeIndexType == -1) {
throw new IOException("The end of stream has been reached");
}
- return
TimeIndexLevel.valueOf(timeIndexType).getTimeIndex().deserialize(inputStream);
+ return TimeIndexLevel.valueOf(timeIndexType)
+ .getTimeIndex()
+ .deserialize(inputStream, deserializer);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/PlainDeviceTimeIndex.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/PlainDeviceTimeIndex.java
index b962a447b06..309067d1672 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/PlainDeviceTimeIndex.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/PlainDeviceTimeIndex.java
@@ -41,7 +41,8 @@ public class PlainDeviceTimeIndex extends
ArrayDeviceTimeIndex implements ITimeI
}
@Override
- public PlainDeviceTimeIndex deserialize(InputStream inputStream) throws
IOException {
+ public PlainDeviceTimeIndex deserialize(
+ InputStream inputStream, IDeviceID.Deserializer deserializer) throws
IOException {
int deviceNum = ReadWriteIOUtils.readInt(inputStream);
startTimes = new long[deviceNum];
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionSchedulerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionSchedulerTest.java
index 3d914cbeb3e..b1377dbbba6 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionSchedulerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionSchedulerTest.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.db.storageengine.buffer.TimeSeriesMetadataCache;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.constant.CrossCompactionPerformer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.constant.InnerSeqCompactionPerformer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.constant.InnerUnseqCompactionPerformer;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleContext;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduler;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionPriority;
@@ -42,7 +43,10 @@ import
org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.constant.TestConstant;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
@@ -1839,6 +1843,37 @@ public class CompactionSchedulerTest {
}
}
+ @Test
+ public void testManyDuplicatedDevicesInDifferentResources() throws
IOException {
+ String sgName = COMPACTION_TEST_SG + "test18";
+ TsFileResource resource1 =
CompactionFileGeneratorUtils.generateTsFileResource(true, 0, sgName);
+ IDeviceID device = new StringArrayDeviceID("root.test.d1");
+ resource1.updateStartTime(device, 1);
+ resource1.updateStartTime(device, 2);
+ File dir = resource1.getTsFile().getParentFile();
+ if (!dir.exists()) {
+ dir.mkdirs();
+ }
+ resource1.serialize();
+ resource1.degradeTimeIndex();
+
+ TsFileResource resource2 =
CompactionFileGeneratorUtils.generateTsFileResource(true, 0, sgName);
+ device = new StringArrayDeviceID("root.test.d1");
+ resource2.updateStartTime(device, 1);
+ resource2.updateStartTime(device, 2);
+ resource2.serialize();
+ resource2.degradeTimeIndex();
+
+ CompactionScheduleContext context = new CompactionScheduleContext();
+ IDeviceID.Deserializer deserializer =
context.getCachedDeviceIdDeserializer();
+
+ IDeviceID deserializedFromResource1 =
+
resource1.buildDeviceTimeIndex(deserializer).getDevices().iterator().next();
+ IDeviceID deserializedFromResource2 =
+
resource2.buildDeviceTimeIndex(deserializer).getDevices().iterator().next();
+ Assert.assertSame(deserializedFromResource1, deserializedFromResource2);
+ }
+
public void stopCompactionTaskManager() {
CompactionTaskManager.getInstance().clearCandidateQueue();
long sleepTime = 0;