This is an automated email from the ASF dual-hosted git repository.

shuwenwei pushed a commit to branch table_disk_usage_statistics_with_cache
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to 
refs/heads/table_disk_usage_statistics_with_cache by this push:
     new 7ac968ce3b8 fix cache writer lifecycle
7ac968ce3b8 is described below

commit 7ac968ce3b8f0604e57da6f2acd19ecff87b6531
Author: shuwenwei <[email protected]>
AuthorDate: Mon Jan 26 14:40:22 2026 +0800

    fix cache writer lifecycle
---
 .../db/storageengine/dataregion/DataRegion.java    |  3 +-
 .../tableDiskUsageCache/TableDiskUsageCache.java   | 77 +++++++++++-----------
 .../TableDiskUsageCacheReader.java                 |  5 +-
 3 files changed, 42 insertions(+), 43 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 44a54c769e3..de2981a9767 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -417,8 +417,7 @@ public class DataRegion implements IDataRegionForQuery {
         IoTDBThreadPoolFactory.newSingleThreadExecutor(
             databaseName + "-" + dataRegionIdString + "-UpgradeMod");
 
-    TableDiskUsageCache.getInstance()
-        .registerRegion(databaseName, Integer.parseInt(dataRegionIdString));
+    TableDiskUsageCache.getInstance().registerRegion(this);
 
     // recover tsfiles unless consensus protocol is ratis and storage engine 
is not ready
     if 
(config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java
index eb19aeffe27..a0f2fa4bab2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java
@@ -21,6 +21,7 @@ package 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache;
 
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
 import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.object.EmptyObjectTableSizeCacheReader;
 import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.object.IObjectTableSizeCacheReader;
@@ -135,20 +136,20 @@ public class TableDiskUsageCache {
   }
 
   public CompletableFuture<Pair<TsFileTableSizeCacheReader, 
IObjectTableSizeCacheReader>> startRead(
-      String database, int regionId, boolean readTsFileCache, boolean 
readObjectFileCache) {
+      DataRegion dataRegion, boolean readTsFileCache, boolean 
readObjectFileCache) {
     StartReadOperation operation =
-        new StartReadOperation(database, regionId, readTsFileCache, 
readObjectFileCache);
+        new StartReadOperation(dataRegion, readTsFileCache, 
readObjectFileCache);
     addOperationToQueue(operation);
     return operation.future;
   }
 
-  public void endRead(String database, int regionId) {
-    EndReadOperation operation = new EndReadOperation(database, regionId);
+  public void endRead(DataRegion dataRegion) {
+    EndReadOperation operation = new EndReadOperation(dataRegion);
     addOperationToQueue(operation);
   }
 
-  public void registerRegion(String database, int regionId) {
-    RegisterRegionOperation operation = new RegisterRegionOperation(database, 
regionId);
+  public void registerRegion(DataRegion region) {
+    RegisterRegionOperation operation = new RegisterRegionOperation(region);
     addOperationToQueue(operation);
   }
 
@@ -177,8 +178,9 @@ public class TableDiskUsageCache {
     writerMap.values().forEach(DataRegionTableSizeCacheWriter::close);
   }
 
-  protected DataRegionTableSizeCacheWriter createWriter(String database, int 
regionId) {
-    return new DataRegionTableSizeCacheWriter(database, regionId);
+  protected DataRegionTableSizeCacheWriter createWriter(
+      String database, int regionId, DataRegion region) {
+    return new DataRegionTableSizeCacheWriter(database, regionId, region);
   }
 
   protected TsFileTableSizeCacheReader createTsFileCacheReader(
@@ -218,43 +220,33 @@ public class TableDiskUsageCache {
   }
 
   protected static class StartReadOperation extends Operation {
+    protected final DataRegion region;
     protected final boolean readTsFileCache;
     protected final boolean readObjectFileCache;
     public CompletableFuture<Pair<TsFileTableSizeCacheReader, 
IObjectTableSizeCacheReader>> future =
         new CompletableFuture<>();
 
     public StartReadOperation(
-        String database, int regionId, boolean readTsFileCache, boolean 
readObjectFileCache) {
-      super(database, regionId);
+        DataRegion dataRegion, boolean readTsFileCache, boolean 
readObjectFileCache) {
+      super(dataRegion.getDatabaseName(), 
dataRegion.getDataRegionId().getId());
+      this.region = dataRegion;
       this.readTsFileCache = readTsFileCache;
       this.readObjectFileCache = readObjectFileCache;
     }
 
     @Override
     public void apply(TableDiskUsageCache tableDiskUsageCache) throws 
IOException {
+      DataRegionTableSizeCacheWriter writer = 
tableDiskUsageCache.writerMap.get(regionId);
       try {
-        DataRegionTableSizeCacheWriter writer =
-            tableDiskUsageCache.writerMap.computeIfAbsent(
-                regionId, k -> tableDiskUsageCache.createWriter(database, 
regionId));
-        // It is safe to always increase activeReaderNum here. Before a 
DataRegion is removed, it is
-        // first marked as deleted, and all table size queries will skip 
DataRegions that are
-        // already marked deleted.
-        // Under this guarantee, waiting for activeReaderNum to reach zero 
will not be blocked by
-        // newly created readers, and the region can be safely removed.
-        writer.increaseActiveReaderNum();
-        if (writer.getRemovedFuture() != null) {
+        if (writer == null || writer.getRemovedFuture() != null) {
           // region is removed
           future.complete(
               new Pair<>(
-                  new TsFileTableSizeCacheReader(
-                      0,
-                      writer.tsFileCacheWriter.getKeyFile(),
-                      0,
-                      writer.tsFileCacheWriter.getValueFile(),
-                      regionId),
+                  new TsFileTableSizeCacheReader(0, null, 0, null, regionId),
                   new EmptyObjectTableSizeCacheReader()));
           return;
         }
+        writer.increaseActiveReaderNum();
         writer.flush();
         TsFileTableSizeCacheReader tsFileTableSizeCacheReader =
             readTsFileCache ? 
tableDiskUsageCache.createTsFileCacheReader(writer, regionId) : null;
@@ -270,8 +262,11 @@ public class TableDiskUsageCache {
   }
 
   private static class EndReadOperation extends Operation {
-    public EndReadOperation(String database, int regionId) {
-      super(database, regionId);
+    protected final DataRegion region;
+
+    public EndReadOperation(DataRegion dataRegion) {
+      super(dataRegion.getDatabaseName(), 
dataRegion.getDataRegionId().getId());
+      this.region = dataRegion;
     }
 
     @Override
@@ -279,6 +274,9 @@ public class TableDiskUsageCache {
       tableDiskUsageCache.writerMap.computeIfPresent(
           regionId,
           (k, writer) -> {
+            if (writer.dataRegion != region) {
+              return writer;
+            }
             writer.decreaseActiveReaderNum();
             if (writer.getRemovedFuture() != null) {
               writer.getRemovedFuture().complete(null);
@@ -303,12 +301,11 @@ public class TableDiskUsageCache {
 
     @Override
     public void apply(TableDiskUsageCache tableDiskUsageCache) throws 
IOException {
-      tableDiskUsageCache
-          .writerMap
-          .computeIfAbsent(
-              regionId, k -> tableDiskUsageCache.createWriter(database, 
tsFileID.regionId))
-          .tsFileCacheWriter
-          .write(tsFileID, tableSizeMap);
+      DataRegionTableSizeCacheWriter dataRegionTableSizeCacheWriter =
+          tableDiskUsageCache.writerMap.get(regionId);
+      if (dataRegionTableSizeCacheWriter != null) {
+        dataRegionTableSizeCacheWriter.tsFileCacheWriter.write(tsFileID, 
tableSizeMap);
+      }
     }
   }
 
@@ -333,16 +330,18 @@ public class TableDiskUsageCache {
 
   protected static class RegisterRegionOperation extends Operation {
 
+    protected final DataRegion dataRegion;
     protected final CompletableFuture<Void> future = new CompletableFuture<>();
 
-    public RegisterRegionOperation(String database, int regionId) {
-      super(database, regionId);
+    public RegisterRegionOperation(DataRegion dataRegion) {
+      super(dataRegion.getDatabaseName(), 
dataRegion.getDataRegionId().getId());
+      this.dataRegion = dataRegion;
     }
 
     @Override
     public void apply(TableDiskUsageCache tableDiskUsageCache) {
       tableDiskUsageCache.writerMap.computeIfAbsent(
-          regionId, regionId -> tableDiskUsageCache.createWriter(database, 
regionId));
+          regionId, regionId -> tableDiskUsageCache.createWriter(database, 
regionId, dataRegion));
       future.complete(null);
     }
 
@@ -394,11 +393,13 @@ public class TableDiskUsageCache {
   }
 
   protected static class DataRegionTableSizeCacheWriter {
+    protected final DataRegion dataRegion;
     protected final TsFileTableDiskUsageCacheWriter tsFileCacheWriter;
     protected int activeReaderNum = 0;
     protected CompletableFuture<Void> removedFuture;
 
-    protected DataRegionTableSizeCacheWriter(String database, int regionId) {
+    protected DataRegionTableSizeCacheWriter(String database, int regionId, 
DataRegion dataRegion) {
+      this.dataRegion = dataRegion;
       this.tsFileCacheWriter = new TsFileTableDiskUsageCacheWriter(database, 
regionId);
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCacheReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCacheReader.java
index cdd72fc064c..c72bdc72e3c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCacheReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCacheReader.java
@@ -76,8 +76,7 @@ public class TableDiskUsageCacheReader implements Closeable {
     if (this.tsFileTableSizeCacheReader == null) {
       this.prepareReaderFuture =
           this.prepareReaderFuture == null
-              ? TableDiskUsageCache.getInstance()
-                  .startRead(dataRegion.getDatabaseName(), regionId, true, 
true)
+              ? TableDiskUsageCache.getInstance().startRead(dataRegion, true, 
true)
               : this.prepareReaderFuture;
       do {
         if (prepareReaderFuture.isDone()) {
@@ -180,7 +179,7 @@ public class TableDiskUsageCacheReader implements Closeable 
{
     closeTsFileTableSizeCacheReader();
     closeObjectFileTableSizeCacheReader();
     if (prepareReaderFuture != null) {
-      TableDiskUsageCache.getInstance().endRead(dataRegion.getDatabaseName(), 
regionId);
+      TableDiskUsageCache.getInstance().endRead(dataRegion);
       prepareReaderFuture = null;
     }
     dataRegionContext.releaseMemory();

Reply via email to