This is an automated email from the ASF dual-hosted git repository.

shuwenwei pushed a commit to branch table_disk_usage_statistics_with_cache
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to 
refs/heads/table_disk_usage_statistics_with_cache by this push:
     new 51336a36b43 close
51336a36b43 is described below

commit 51336a36b437c059cee2fa3c49dc89cc85386ff4
Author: shuwenwei <[email protected]>
AuthorDate: Tue Jan 27 10:08:50 2026 +0800

    close
---
 .../iotdb/db/storageengine/StorageEngine.java      |  2 ++
 .../tableDiskUsageCache/TableDiskUsageCache.java   | 39 +++++++++++++++++-----
 2 files changed, 33 insertions(+), 8 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 964f6edcd76..66d1d6d0d79 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -71,6 +71,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio;
 import org.apache.iotdb.db.storageengine.dataregion.flush.FlushListener;
 import org.apache.iotdb.db.storageengine.dataregion.flush.TsFileFlushPolicy;
 import 
org.apache.iotdb.db.storageengine.dataregion.flush.TsFileFlushPolicy.DirectFlushPolicy;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.TableDiskUsageCache;
 import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
 import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALException;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.recover.WALRecoverManager;
@@ -419,6 +420,7 @@ public class StorageEngine implements IService {
     if (cachedThreadPool != null) {
       cachedThreadPool.shutdownNow();
     }
+    TableDiskUsageCache.getInstance().close();
     dataRegionMap.clear();
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java
index 30d9fec327b..f7ee6057c86 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java
@@ -21,6 +21,7 @@ package 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache;
 
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.utils.PathUtils;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
 import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.object.EmptyObjectTableSizeCacheReader;
@@ -44,12 +45,13 @@ import java.util.concurrent.TimeUnit;
 
 public class TableDiskUsageCache {
   protected static final Logger LOGGER = 
LoggerFactory.getLogger(TableDiskUsageCache.class);
-  protected final BlockingQueue<Operation> queue = new LinkedBlockingQueue<>();
+  protected final BlockingQueue<Operation> queue = new 
LinkedBlockingQueue<>(1000);
   // regionId -> writer mapping
   protected final Map<Integer, DataRegionTableSizeCacheWriter> writerMap = new 
HashMap<>();
   protected final ScheduledExecutorService scheduledExecutorService;
   private int processedOperationCountSinceLastPeriodicCheck = 0;
   protected volatile boolean failedToRecover = false;
+  private volatile boolean stop = false;
 
   protected TableDiskUsageCache() {
     scheduledExecutorService =
@@ -60,7 +62,7 @@ public class TableDiskUsageCache {
 
   protected void run() {
     try {
-      while (!Thread.currentThread().isInterrupted()) {
+      while (!stop) {
         try {
           for (DataRegionTableSizeCacheWriter writer : writerMap.values()) {
             syncTsFileTableSizeCacheIfNecessary(writer);
@@ -75,6 +77,7 @@ public class TableDiskUsageCache {
             performPeriodicMaintenance();
           }
         } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
           return;
         } catch (Exception e) {
           LOGGER.error("Meet exception when apply TableDiskUsageCache 
operation.", e);
@@ -163,6 +166,9 @@ public class TableDiskUsageCache {
 
   public void registerRegion(DataRegion region) {
     RegisterRegionOperation operation = new RegisterRegionOperation(region);
+    if (!PathUtils.isTableModelDatabase(region.getDatabaseName())) {
+      return;
+    }
     addOperationToQueue(operation);
   }
 
@@ -173,22 +179,38 @@ public class TableDiskUsageCache {
       operation.future.get(5, TimeUnit.SECONDS);
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
-    } catch (Exception ignored) {
+    } catch (Exception e) {
+      LOGGER.error("Meet exception when remove TableDiskUsageCache.", e);
     }
   }
 
   protected void addOperationToQueue(Operation operation) {
-    if (failedToRecover) {
+    if (failedToRecover || stop) {
       return;
     }
-    queue.add(operation);
+    try {
+      queue.put(operation);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  public int getQueueSize() {
+    return queue.size();
   }
 
   public void close() {
-    if (scheduledExecutorService != null) {
-      scheduledExecutorService.shutdownNow();
+    if (scheduledExecutorService == null) {
+      return;
+    }
+    try {
+      stop = true;
+      scheduledExecutorService.shutdown();
+      scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
+      writerMap.values().forEach(DataRegionTableSizeCacheWriter::close);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
     }
-    writerMap.values().forEach(DataRegionTableSizeCacheWriter::close);
   }
 
   protected DataRegionTableSizeCacheWriter createWriter(
@@ -385,6 +407,7 @@ public class TableDiskUsageCache {
               return writer;
             }
             writer.close();
+            future.complete(null);
             return null;
           });
     }

Reply via email to