This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e879c5497e0 Reduce duplicated DeviceID in compaction selection (#16314)
e879c5497e0 is described below

commit e879c5497e00b3b872cfd5dc82477a6b785d1f5a
Author: shuwenwei <[email protected]>
AuthorDate: Tue Dec 30 12:25:08 2025 +0800

    Reduce duplicated DeviceID in compaction selection (#16314)
    
    * reduce duplicated DeviceID in compaction selection
    
    * fix ut
---
 .../compaction/execute/utils/CompactionUtils.java  |  5 ++++
 .../schedule/CompactionScheduleContext.java        | 27 +++++++++++++++++
 .../selector/utils/TsFileResourceCandidate.java    |  7 ++++-
 .../dataregion/tsfile/TsFileResource.java          | 13 ++++++--
 .../tsfile/timeindex/ArrayDeviceTimeIndex.java     |  5 ++--
 .../dataregion/tsfile/timeindex/FileTimeIndex.java |  3 +-
 .../dataregion/tsfile/timeindex/ITimeIndex.java    | 10 +++++--
 .../tsfile/timeindex/PlainDeviceTimeIndex.java     |  3 +-
 .../compaction/CompactionSchedulerTest.java        | 35 ++++++++++++++++++++++
 9 files changed, 97 insertions(+), 11 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
index 1e3f494ae8c..61943047e54 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
@@ -482,6 +482,11 @@ public class CompactionUtils {
 
   public static ArrayDeviceTimeIndex buildDeviceTimeIndex(TsFileResource 
resource)
       throws IOException {
+    return buildDeviceTimeIndex(resource, 
IDeviceID.Deserializer.DEFAULT_DESERIALIZER);
+  }
+
+  public static ArrayDeviceTimeIndex buildDeviceTimeIndex(
+      TsFileResource resource, IDeviceID.Deserializer deserializer) throws 
IOException {
     long resourceFileSize =
         new File(resource.getTsFilePath() + 
TsFileResource.RESOURCE_SUFFIX).length();
     
CompactionTaskManager.getInstance().getCompactionReadOperationRateLimiter().acquire(1);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java
index 2bfc515c5dc..6f0f7872789 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java
@@ -33,7 +33,11 @@ import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ArrayDevice
 import org.apache.iotdb.db.utils.EncryptDBUtils;
 
 import org.apache.tsfile.encrypt.EncryptParameter;
+import org.apache.tsfile.file.metadata.IDeviceID;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -53,6 +57,7 @@ public class CompactionScheduleContext {
   // end region
 
   private final Map<TsFileResource, ArrayDeviceTimeIndex> 
partitionFileDeviceInfoCache;
+  private final Map<IDeviceID, IDeviceID> deviceIdCache;
   private long cachedDeviceInfoSize = 0;
 
   private final Set<Long> timePartitionsDelayInsertionSelection;
@@ -63,12 +68,14 @@ public class CompactionScheduleContext {
   public CompactionScheduleContext() {
     this.partitionFileDeviceInfoCache = new HashMap<>();
     this.timePartitionsDelayInsertionSelection = new HashSet<>();
+    this.deviceIdCache = new HashMap<>();
     this.encryptParameter = EncryptDBUtils.getDefaultFirstEncryptParam();
   }
 
   public CompactionScheduleContext(EncryptParameter encryptParameter) {
     this.partitionFileDeviceInfoCache = new HashMap<>();
     this.timePartitionsDelayInsertionSelection = new HashSet<>();
+    this.deviceIdCache = new HashMap<>();
     this.encryptParameter = encryptParameter;
   }
 
@@ -95,6 +102,7 @@ public class CompactionScheduleContext {
 
   public void clearTimePartitionDeviceInfoCache() {
     partitionFileDeviceInfoCache.clear();
+    deviceIdCache.clear();
     CompactionMetrics.getInstance()
         .decreaseSelectionCachedDeviceTimeIndexSize(cachedDeviceInfoSize);
     cachedDeviceInfoSize = 0;
@@ -209,4 +217,23 @@ public class CompactionScheduleContext {
         .getCrossCompactionPerformer()
         .createInstance(encryptParameter);
   }
+
+  public IDeviceID.Deserializer getCachedDeviceIdDeserializer() {
+    return new CachedIDeviceIdDeserializer();
+  }
+
+  private class CachedIDeviceIdDeserializer implements IDeviceID.Deserializer {
+
+    @Override
+    public IDeviceID deserializeFrom(ByteBuffer byteBuffer) {
+      IDeviceID deviceId = 
IDeviceID.Deserializer.DEFAULT_DESERIALIZER.deserializeFrom(byteBuffer);
+      return deviceIdCache.computeIfAbsent(deviceId, k -> deviceId);
+    }
+
+    @Override
+    public IDeviceID deserializeFrom(InputStream inputStream) throws 
IOException {
+      IDeviceID deviceId = 
IDeviceID.Deserializer.DEFAULT_DESERIALIZER.deserializeFrom(inputStream);
+      return deviceIdCache.computeIfAbsent(deviceId, k -> deviceId);
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/utils/TsFileResourceCandidate.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/utils/TsFileResourceCandidate.java
index a2b0b2d40bf..acde3c607f7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/utils/TsFileResourceCandidate.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/utils/TsFileResourceCandidate.java
@@ -100,7 +100,12 @@ public class TsFileResourceCandidate {
           deviceTimeIndex = new ArrayDeviceTimeIndex();
           return;
         }
-        deviceTimeIndex = CompactionUtils.buildDeviceTimeIndex(resource);
+        deviceTimeIndex =
+            CompactionUtils.buildDeviceTimeIndex(
+                resource,
+                compactionScheduleContext == null
+                    ? IDeviceID.Deserializer.DEFAULT_DESERIALIZER
+                    : 
compactionScheduleContext.getCachedDeviceIdDeserializer());
       } finally {
         resource.readUnlock();
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index 3169cf85ccb..9a827360f70 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -320,7 +320,8 @@ public class TsFileResource implements PersistentResource, 
Cloneable {
     try (InputStream inputStream = fsFactory.getBufferedInputStream(file + 
RESOURCE_SUFFIX)) {
       // The first byte is VERSION_NUMBER, second byte is timeIndexType.
       ReadWriteIOUtils.readByte(inputStream);
-      timeIndex = ITimeIndex.createTimeIndex(inputStream);
+      timeIndex =
+          ITimeIndex.createTimeIndex(inputStream, 
IDeviceID.Deserializer.DEFAULT_DESERIALIZER);
       maxPlanIndex = ReadWriteIOUtils.readLong(inputStream);
       minPlanIndex = ReadWriteIOUtils.readLong(inputStream);
 
@@ -676,7 +677,8 @@ public class TsFileResource implements PersistentResource, 
Cloneable {
     return timeIndex.getDevices(file.getPath(), this);
   }
 
-  public ArrayDeviceTimeIndex buildDeviceTimeIndex() throws IOException {
+  public ArrayDeviceTimeIndex buildDeviceTimeIndex(IDeviceID.Deserializer 
deserializer)
+      throws IOException {
     readLock();
     try {
       if (!resourceFileExists()) {
@@ -686,7 +688,8 @@ public class TsFileResource implements PersistentResource, 
Cloneable {
           FSFactoryProducer.getFSFactory()
               .getBufferedInputStream(file.getPath() + RESOURCE_SUFFIX)) {
         ReadWriteIOUtils.readByte(inputStream);
-        ITimeIndex timeIndexFromResourceFile = 
ITimeIndex.createTimeIndex(inputStream);
+        ITimeIndex timeIndexFromResourceFile =
+            ITimeIndex.createTimeIndex(inputStream, deserializer);
         if (!(timeIndexFromResourceFile instanceof ArrayDeviceTimeIndex)) {
           throw new IOException("cannot build DeviceTimeIndex from resource " 
+ file.getPath());
         }
@@ -700,6 +703,10 @@ public class TsFileResource implements PersistentResource, 
Cloneable {
     }
   }
 
+  public ArrayDeviceTimeIndex buildDeviceTimeIndex() throws IOException {
+    return buildDeviceTimeIndex(IDeviceID.Deserializer.DEFAULT_DESERIALIZER);
+  }
+
   /**
    * Used for compaction to verify tsfile, also used to verify TimeIndex 
version when loading tsfile
    */
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java
index f8503c79af5..8499b6d6b3d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java
@@ -113,7 +113,8 @@ public class ArrayDeviceTimeIndex implements ITimeIndex {
   }
 
   @Override
-  public ArrayDeviceTimeIndex deserialize(InputStream inputStream) throws 
IOException {
+  public ArrayDeviceTimeIndex deserialize(
+      InputStream inputStream, IDeviceID.Deserializer deserializer) throws 
IOException {
     int deviceNum = ReadWriteIOUtils.readInt(inputStream);
 
     startTimes = new long[deviceNum];
@@ -127,7 +128,7 @@ public class ArrayDeviceTimeIndex implements ITimeIndex {
     }
 
     for (int i = 0; i < deviceNum; i++) {
-      IDeviceID deviceID = 
Deserializer.DEFAULT_DESERIALIZER.deserializeFrom(inputStream);
+      IDeviceID deviceID = deserializer.deserializeFrom(inputStream);
       int index = ReadWriteIOUtils.readInt(inputStream);
       deviceToIndex.put(deviceID, index);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java
index 0e967fc622d..e4a812012a8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java
@@ -72,7 +72,8 @@ public class FileTimeIndex implements ITimeIndex {
   }
 
   @Override
-  public FileTimeIndex deserialize(InputStream inputStream) throws IOException 
{
+  public FileTimeIndex deserialize(InputStream inputStream, 
IDeviceID.Deserializer deserializer)
+      throws IOException {
     throw new UnsupportedOperationException();
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java
index 214ae53750c..d705a2417d7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java
@@ -53,7 +53,8 @@ public interface ITimeIndex {
    * @param inputStream inputStream
    * @return TimeIndex
    */
-  ITimeIndex deserialize(InputStream inputStream) throws IOException;
+  ITimeIndex deserialize(InputStream inputStream, IDeviceID.Deserializer 
deserializer)
+      throws IOException;
 
   /**
    * deserialize from byte buffer
@@ -218,11 +219,14 @@ public interface ITimeIndex {
    */
   byte getTimeIndexType();
 
-  static ITimeIndex createTimeIndex(InputStream inputStream) throws 
IOException {
+  static ITimeIndex createTimeIndex(InputStream inputStream, 
IDeviceID.Deserializer deserializer)
+      throws IOException {
     byte timeIndexType = ReadWriteIOUtils.readByte(inputStream);
     if (timeIndexType == -1) {
       throw new IOException("The end of stream has been reached");
     }
-    return 
TimeIndexLevel.valueOf(timeIndexType).getTimeIndex().deserialize(inputStream);
+    return TimeIndexLevel.valueOf(timeIndexType)
+        .getTimeIndex()
+        .deserialize(inputStream, deserializer);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/PlainDeviceTimeIndex.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/PlainDeviceTimeIndex.java
index b962a447b06..309067d1672 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/PlainDeviceTimeIndex.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/PlainDeviceTimeIndex.java
@@ -41,7 +41,8 @@ public class PlainDeviceTimeIndex extends 
ArrayDeviceTimeIndex implements ITimeI
   }
 
   @Override
-  public PlainDeviceTimeIndex deserialize(InputStream inputStream) throws 
IOException {
+  public PlainDeviceTimeIndex deserialize(
+      InputStream inputStream, IDeviceID.Deserializer deserializer) throws 
IOException {
     int deviceNum = ReadWriteIOUtils.readInt(inputStream);
 
     startTimes = new long[deviceNum];
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionSchedulerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionSchedulerTest.java
index 3d914cbeb3e..b1377dbbba6 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionSchedulerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionSchedulerTest.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.db.storageengine.buffer.TimeSeriesMetadataCache;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.constant.CrossCompactionPerformer;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.constant.InnerSeqCompactionPerformer;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.constant.InnerUnseqCompactionPerformer;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleContext;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduler;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionPriority;
@@ -42,7 +43,10 @@ import 
org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.db.utils.constant.TestConstant;
 
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -1839,6 +1843,37 @@ public class CompactionSchedulerTest {
     }
   }
 
+  @Test
+  public void testManyDuplicatedDevicesInDifferentResources() throws 
IOException {
+    String sgName = COMPACTION_TEST_SG + "test18";
+    TsFileResource resource1 = 
CompactionFileGeneratorUtils.generateTsFileResource(true, 0, sgName);
+    IDeviceID device = new StringArrayDeviceID("root.test.d1");
+    resource1.updateStartTime(device, 1);
+    resource1.updateStartTime(device, 2);
+    File dir = resource1.getTsFile().getParentFile();
+    if (!dir.exists()) {
+      dir.mkdirs();
+    }
+    resource1.serialize();
+    resource1.degradeTimeIndex();
+
+    TsFileResource resource2 = 
CompactionFileGeneratorUtils.generateTsFileResource(true, 0, sgName);
+    device = new StringArrayDeviceID("root.test.d1");
+    resource2.updateStartTime(device, 1);
+    resource2.updateStartTime(device, 2);
+    resource2.serialize();
+    resource2.degradeTimeIndex();
+
+    CompactionScheduleContext context = new CompactionScheduleContext();
+    IDeviceID.Deserializer deserializer = 
context.getCachedDeviceIdDeserializer();
+
+    IDeviceID deserializedFromResource1 =
+        
resource1.buildDeviceTimeIndex(deserializer).getDevices().iterator().next();
+    IDeviceID deserializedFromResource2 =
+        
resource2.buildDeviceTimeIndex(deserializer).getDevices().iterator().next();
+    Assert.assertSame(deserializedFromResource1, deserializedFromResource2);
+  }
+
   public void stopCompactionTaskManager() {
     CompactionTaskManager.getInstance().clearCandidateQueue();
     long sleepTime = 0;

Reply via email to