This is an automated email from the ASF dual-hosted git repository.

shuwenwei pushed a commit to branch 
reduceDuplicatedDeviceIdInCompactionSelection
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1d10c10a806c7f8a9d9860e8461809abbbab0f89
Author: shuwenwei <[email protected]>
AuthorDate: Mon Sep 1 10:03:22 2025 +0800

    reduce duplicated DeviceID in compaction selection
---
 .../compaction/execute/utils/CompactionUtils.java  |  5 ++++
 .../schedule/CompactionScheduleContext.java        | 27 +++++++++++++++++++
 .../selector/utils/TsFileResourceCandidate.java    |  7 ++++-
 .../dataregion/tsfile/TsFileResource.java          | 13 ++++++---
 .../tsfile/timeindex/ArrayDeviceTimeIndex.java     |  5 ++--
 .../dataregion/tsfile/timeindex/FileTimeIndex.java |  3 ++-
 .../dataregion/tsfile/timeindex/ITimeIndex.java    | 10 ++++---
 .../tsfile/timeindex/PlainDeviceTimeIndex.java     |  3 ++-
 .../compaction/CompactionSchedulerTest.java        | 31 ++++++++++++++++++++++
 9 files changed, 93 insertions(+), 11 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
index 57191a99a4e..b7d87233deb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
@@ -462,6 +462,11 @@ public class CompactionUtils {
 
   public static ArrayDeviceTimeIndex buildDeviceTimeIndex(TsFileResource 
resource)
       throws IOException {
+    return buildDeviceTimeIndex(resource, 
IDeviceID.Deserializer.DEFAULT_DESERIALIZER);
+  }
+
+  public static ArrayDeviceTimeIndex buildDeviceTimeIndex(
+      TsFileResource resource, IDeviceID.Deserializer deserializer) throws 
IOException {
     long resourceFileSize =
         new File(resource.getTsFilePath() + 
TsFileResource.RESOURCE_SUFFIX).length();
     
CompactionTaskManager.getInstance().getCompactionReadOperationRateLimiter().acquire(1);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java
index 1f01cac0b25..f9721b14796 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java
@@ -30,6 +30,11 @@ import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.Sett
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ArrayDeviceTimeIndex;
 
+import org.apache.tsfile.file.metadata.IDeviceID;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -49,6 +54,7 @@ public class CompactionScheduleContext {
   // end region
 
   private final Map<TsFileResource, ArrayDeviceTimeIndex> 
partitionFileDeviceInfoCache;
+  private final Map<IDeviceID, IDeviceID> deviceIdCache;
   private long cachedDeviceInfoSize = 0;
 
   private final Set<Long> timePartitionsDelayInsertionSelection;
@@ -56,6 +62,7 @@ public class CompactionScheduleContext {
   public CompactionScheduleContext() {
     this.partitionFileDeviceInfoCache = new HashMap<>();
     this.timePartitionsDelayInsertionSelection = new HashSet<>();
+    this.deviceIdCache = new HashMap<>();
   }
 
   public void delayInsertionSelection(long timePartitionId) {
@@ -81,6 +88,7 @@ public class CompactionScheduleContext {
 
   public void clearTimePartitionDeviceInfoCache() {
     partitionFileDeviceInfoCache.clear();
+    deviceIdCache.clear();
     CompactionMetrics.getInstance()
         .decreaseSelectionCachedDeviceTimeIndexSize(cachedDeviceInfoSize);
     cachedDeviceInfoSize = 0;
@@ -192,4 +200,23 @@ public class CompactionScheduleContext {
   public ICrossCompactionPerformer getCrossCompactionPerformer() {
     return 
IoTDBDescriptor.getInstance().getConfig().getCrossCompactionPerformer().createInstance();
   }
+
+  public IDeviceID.Deserializer getCachedDeviceIdDeserializer() {
+    return new CachedIDeviceIdDeserializer();
+  }
+
+  private class CachedIDeviceIdDeserializer implements IDeviceID.Deserializer {
+
+    @Override
+    public IDeviceID deserializeFrom(ByteBuffer byteBuffer) {
+      IDeviceID deviceId = 
IDeviceID.Deserializer.DEFAULT_DESERIALIZER.deserializeFrom(byteBuffer);
+      return deviceIdCache.computeIfAbsent(deviceId, k -> deviceId);
+    }
+
+    @Override
+    public IDeviceID deserializeFrom(InputStream inputStream) throws 
IOException {
+      IDeviceID deviceId = 
IDeviceID.Deserializer.DEFAULT_DESERIALIZER.deserializeFrom(inputStream);
+      return deviceIdCache.computeIfAbsent(deviceId, k -> deviceId);
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/utils/TsFileResourceCandidate.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/utils/TsFileResourceCandidate.java
index a2b0b2d40bf..acde3c607f7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/utils/TsFileResourceCandidate.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/utils/TsFileResourceCandidate.java
@@ -100,7 +100,12 @@ public class TsFileResourceCandidate {
           deviceTimeIndex = new ArrayDeviceTimeIndex();
           return;
         }
-        deviceTimeIndex = CompactionUtils.buildDeviceTimeIndex(resource);
+        deviceTimeIndex =
+            CompactionUtils.buildDeviceTimeIndex(
+                resource,
+                compactionScheduleContext == null
+                    ? IDeviceID.Deserializer.DEFAULT_DESERIALIZER
+                    : 
compactionScheduleContext.getCachedDeviceIdDeserializer());
       } finally {
         resource.readUnlock();
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index eb03718accc..77c1fbb5376 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -318,7 +318,8 @@ public class TsFileResource implements PersistentResource {
     try (InputStream inputStream = fsFactory.getBufferedInputStream(file + 
RESOURCE_SUFFIX)) {
       // The first byte is VERSION_NUMBER, second byte is timeIndexType.
       ReadWriteIOUtils.readByte(inputStream);
-      timeIndex = ITimeIndex.createTimeIndex(inputStream);
+      timeIndex =
+          ITimeIndex.createTimeIndex(inputStream, 
IDeviceID.Deserializer.DEFAULT_DESERIALIZER);
       maxPlanIndex = ReadWriteIOUtils.readLong(inputStream);
       minPlanIndex = ReadWriteIOUtils.readLong(inputStream);
 
@@ -674,7 +675,8 @@ public class TsFileResource implements PersistentResource {
     return timeIndex.getDevices(file.getPath(), this);
   }
 
-  public ArrayDeviceTimeIndex buildDeviceTimeIndex() throws IOException {
+  public ArrayDeviceTimeIndex buildDeviceTimeIndex(IDeviceID.Deserializer 
deserializer)
+      throws IOException {
     readLock();
     try {
       if (!resourceFileExists()) {
@@ -684,7 +686,8 @@ public class TsFileResource implements PersistentResource {
           FSFactoryProducer.getFSFactory()
               .getBufferedInputStream(file.getPath() + RESOURCE_SUFFIX)) {
         ReadWriteIOUtils.readByte(inputStream);
-        ITimeIndex timeIndexFromResourceFile = 
ITimeIndex.createTimeIndex(inputStream);
+        ITimeIndex timeIndexFromResourceFile =
+            ITimeIndex.createTimeIndex(inputStream, deserializer);
         if (!(timeIndexFromResourceFile instanceof ArrayDeviceTimeIndex)) {
           throw new IOException("cannot build DeviceTimeIndex from resource " 
+ file.getPath());
         }
@@ -698,6 +701,10 @@ public class TsFileResource implements PersistentResource {
     }
   }
 
+  public ArrayDeviceTimeIndex buildDeviceTimeIndex() throws IOException {
+    return buildDeviceTimeIndex(IDeviceID.Deserializer.DEFAULT_DESERIALIZER);
+  }
+
   /**
    * Used for compaction to verify tsfile, also used to verify TimeIndex 
version when loading tsfile
    */
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java
index f8503c79af5..8499b6d6b3d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java
@@ -113,7 +113,8 @@ public class ArrayDeviceTimeIndex implements ITimeIndex {
   }
 
   @Override
-  public ArrayDeviceTimeIndex deserialize(InputStream inputStream) throws 
IOException {
+  public ArrayDeviceTimeIndex deserialize(
+      InputStream inputStream, IDeviceID.Deserializer deserializer) throws 
IOException {
     int deviceNum = ReadWriteIOUtils.readInt(inputStream);
 
     startTimes = new long[deviceNum];
@@ -127,7 +128,7 @@ public class ArrayDeviceTimeIndex implements ITimeIndex {
     }
 
     for (int i = 0; i < deviceNum; i++) {
-      IDeviceID deviceID = 
Deserializer.DEFAULT_DESERIALIZER.deserializeFrom(inputStream);
+      IDeviceID deviceID = deserializer.deserializeFrom(inputStream);
       int index = ReadWriteIOUtils.readInt(inputStream);
       deviceToIndex.put(deviceID, index);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java
index 0e967fc622d..e4a812012a8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java
@@ -72,7 +72,8 @@ public class FileTimeIndex implements ITimeIndex {
   }
 
   @Override
-  public FileTimeIndex deserialize(InputStream inputStream) throws IOException 
{
+  public FileTimeIndex deserialize(InputStream inputStream, 
IDeviceID.Deserializer deserializer)
+      throws IOException {
     throw new UnsupportedOperationException();
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java
index 214ae53750c..d705a2417d7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java
@@ -53,7 +53,8 @@ public interface ITimeIndex {
    * @param inputStream inputStream
    * @return TimeIndex
    */
-  ITimeIndex deserialize(InputStream inputStream) throws IOException;
+  ITimeIndex deserialize(InputStream inputStream, IDeviceID.Deserializer 
deserializer)
+      throws IOException;
 
   /**
    * deserialize from byte buffer
@@ -218,11 +219,14 @@ public interface ITimeIndex {
    */
   byte getTimeIndexType();
 
-  static ITimeIndex createTimeIndex(InputStream inputStream) throws 
IOException {
+  static ITimeIndex createTimeIndex(InputStream inputStream, 
IDeviceID.Deserializer deserializer)
+      throws IOException {
     byte timeIndexType = ReadWriteIOUtils.readByte(inputStream);
     if (timeIndexType == -1) {
       throw new IOException("The end of stream has been reached");
     }
-    return 
TimeIndexLevel.valueOf(timeIndexType).getTimeIndex().deserialize(inputStream);
+    return TimeIndexLevel.valueOf(timeIndexType)
+        .getTimeIndex()
+        .deserialize(inputStream, deserializer);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/PlainDeviceTimeIndex.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/PlainDeviceTimeIndex.java
index b962a447b06..309067d1672 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/PlainDeviceTimeIndex.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/PlainDeviceTimeIndex.java
@@ -41,7 +41,8 @@ public class PlainDeviceTimeIndex extends 
ArrayDeviceTimeIndex implements ITimeI
   }
 
   @Override
-  public PlainDeviceTimeIndex deserialize(InputStream inputStream) throws 
IOException {
+  public PlainDeviceTimeIndex deserialize(
+      InputStream inputStream, IDeviceID.Deserializer deserializer) throws 
IOException {
     int deviceNum = ReadWriteIOUtils.readInt(inputStream);
 
     startTimes = new long[deviceNum];
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionSchedulerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionSchedulerTest.java
index 3d914cbeb3e..01242fabba9 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionSchedulerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionSchedulerTest.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.db.storageengine.buffer.TimeSeriesMetadataCache;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.constant.CrossCompactionPerformer;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.constant.InnerSeqCompactionPerformer;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.constant.InnerUnseqCompactionPerformer;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleContext;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduler;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionPriority;
@@ -42,7 +43,10 @@ import 
org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.db.utils.constant.TestConstant;
 
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -1839,6 +1843,33 @@ public class CompactionSchedulerTest {
     }
   }
 
+  @Test
+  public void testManyDuplicatedDevicesInDifferentResources() throws 
IOException {
+    String sgName = COMPACTION_TEST_SG + "test18";
+    TsFileResource resource1 = 
CompactionFileGeneratorUtils.generateTsFileResource(true, 0, sgName);
+    IDeviceID device = new StringArrayDeviceID("root.test.d1");
+    resource1.updateStartTime(device, 1);
+    resource1.updateStartTime(device, 2);
+    resource1.serialize();
+    resource1.degradeTimeIndex();
+
+    TsFileResource resource2 = 
CompactionFileGeneratorUtils.generateTsFileResource(true, 0, sgName);
+    device = new StringArrayDeviceID("root.test.d1");
+    resource2.updateStartTime(device, 1);
+    resource2.updateStartTime(device, 2);
+    resource2.serialize();
+    resource2.degradeTimeIndex();
+
+    CompactionScheduleContext context = new CompactionScheduleContext();
+    IDeviceID.Deserializer deserializer = 
context.getCachedDeviceIdDeserializer();
+
+    IDeviceID deserializedFromResource1 =
+        
resource1.buildDeviceTimeIndex(deserializer).getDevices().iterator().next();
+    IDeviceID deserializedFromResource2 =
+        
resource2.buildDeviceTimeIndex(deserializer).getDevices().iterator().next();
+    Assert.assertSame(deserializedFromResource1, deserializedFromResource2);
+  }
+
   public void stopCompactionTaskManager() {
     CompactionTaskManager.getInstance().clearCandidateQueue();
     long sleepTime = 0;

Reply via email to