This is an automated email from the ASF dual-hosted git repository.

shuwenwei pushed a commit to branch table_disk_usage_statistics_with_cache
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to 
refs/heads/table_disk_usage_statistics_with_cache by this push:
     new 83482fa7381 add ut
83482fa7381 is described below

commit 83482fa738115a645116017b5232e70abbd3dc4a
Author: shuwenwei <[email protected]>
AuthorDate: Wed Jan 28 15:36:27 2026 +0800

    add ut
---
 .../InformationSchemaContentSupplierFactory.java   |   3 +
 .../tableDiskUsageCache/TableDiskUsageCache.java   |  34 ++-
 .../TableDiskUsageCacheReader.java                 |   2 -
 .../tsfile/TsFileTableDiskUsageCacheWriter.java    |   1 -
 .../dataregion/utils/TableDiskUsageTest.java       | 234 +++++++++++++++++++++
 5 files changed, 266 insertions(+), 8 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
index 9349bc9dfdb..5b486713e8f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
@@ -1214,6 +1214,9 @@ public class InformationSchemaContentSupplierFactory {
       
AuthorityChecker.getAccessControl().checkUserGlobalSysPrivilege(userEntity);
       try (final ConfigNodeClient client =
           
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
 {
+        // It is better to use an async ConfigNode client here.
+        // Using a synchronous client may block the calling thread when the 
ConfigNode response is
+        // slow or temporarily unavailable, which can cause the operator to 
exceed its maxRunTime
         this.databaseTableInfoMap = 
client.showTables4InformationSchema().getDatabaseTableInfoMap();
       }
       this.dataRegionIterator =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java
index f7ee6057c86..9e8795b1594 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java
@@ -22,6 +22,7 @@ package 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.utils.PathUtils;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
 import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.object.EmptyObjectTableSizeCacheReader;
@@ -48,7 +49,7 @@ public class TableDiskUsageCache {
   protected final BlockingQueue<Operation> queue = new 
LinkedBlockingQueue<>(1000);
   // regionId -> writer mapping
   protected final Map<Integer, DataRegionTableSizeCacheWriter> writerMap = new 
HashMap<>();
-  protected final ScheduledExecutorService scheduledExecutorService;
+  protected ScheduledExecutorService scheduledExecutorService;
   private int processedOperationCountSinceLastPeriodicCheck = 0;
   protected volatile boolean failedToRecover = false;
   private volatile boolean stop = false;
@@ -155,7 +156,13 @@ public class TableDiskUsageCache {
       DataRegion dataRegion, boolean readTsFileCache, boolean 
readObjectFileCache) {
     StartReadOperation operation =
         new StartReadOperation(dataRegion, readTsFileCache, 
readObjectFileCache);
-    addOperationToQueue(operation);
+    if (!addOperationToQueue(operation)) {
+      operation.future.complete(
+          new Pair<>(
+              new TsFileTableSizeCacheReader(
+                  0, null, 0, null, dataRegion.getDataRegionId().getId()),
+              new EmptyObjectTableSizeCacheReader()));
+    }
     return operation.future;
   }
 
@@ -174,7 +181,9 @@ public class TableDiskUsageCache {
 
   public void remove(String database, int regionId) {
     RemoveRegionOperation operation = new RemoveRegionOperation(database, 
regionId);
-    addOperationToQueue(operation);
+    if (!addOperationToQueue(operation)) {
+      return;
+    }
     try {
       operation.future.get(5, TimeUnit.SECONDS);
     } catch (InterruptedException e) {
@@ -184,15 +193,17 @@ public class TableDiskUsageCache {
     }
   }
 
-  protected void addOperationToQueue(Operation operation) {
+  protected boolean addOperationToQueue(Operation operation) {
     if (failedToRecover || stop) {
-      return;
+      return false;
     }
     try {
       queue.put(operation);
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
+      return false;
     }
+    return true;
   }
 
   public int getQueueSize() {
@@ -208,11 +219,24 @@ public class TableDiskUsageCache {
       scheduledExecutorService.shutdown();
       scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
       writerMap.values().forEach(DataRegionTableSizeCacheWriter::close);
+      writerMap.clear();
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
     }
   }
 
+  @TestOnly
+  public void ensureRunning() {
+    stop = false;
+    failedToRecover = false;
+    if (scheduledExecutorService.isTerminated()) {
+      scheduledExecutorService =
+          IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+              ThreadName.FILE_TIME_INDEX_RECORD.getName());
+      scheduledExecutorService.submit(this::run);
+    }
+  }
+
   protected DataRegionTableSizeCacheWriter createWriter(
       String database, int regionId, DataRegion region) {
     return new DataRegionTableSizeCacheWriter(database, regionId, region);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCacheReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCacheReader.java
index c72bdc72e3c..6d26856824f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCacheReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCacheReader.java
@@ -39,7 +39,6 @@ import java.util.concurrent.CompletableFuture;
 public class TableDiskUsageCacheReader implements Closeable {
 
   private final DataRegion dataRegion;
-  private final int regionId;
   private final DataRegionTableSizeQueryContext dataRegionContext;
 
   private CompletableFuture<Pair<TsFileTableSizeCacheReader, 
IObjectTableSizeCacheReader>>
@@ -64,7 +63,6 @@ public class TableDiskUsageCacheReader implements Closeable {
       DataRegionTableSizeQueryContext dataRegionContext,
       boolean databaseHasOnlyOneTable) {
     this.dataRegion = dataRegion;
-    this.regionId = Integer.parseInt(dataRegion.getDataRegionIdString());
     this.dataRegionContext = dataRegionContext;
     this.currentDatabaseOnlyHasOneTable = databaseHasOnlyOneTable;
     this.timePartitionIterator =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/tsfile/TsFileTableDiskUsageCacheWriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/tsfile/TsFileTableDiskUsageCacheWriter.java
index 2ecd3d39662..7c60d38fc13 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/tsfile/TsFileTableDiskUsageCacheWriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/tsfile/TsFileTableDiskUsageCacheWriter.java
@@ -130,7 +130,6 @@ public class TsFileTableDiskUsageCacheWriter extends 
AbstractTableSizeCacheWrite
   }
 
   private int getVersion(String fileName) throws NumberFormatException {
-    int version = 0;
     String removePrefixStr = 
fileName.substring(TSFILE_CACHE_KEY_FILENAME_PREFIX.length());
     int suffixIdx = removePrefixStr.indexOf('.');
     return Integer.parseInt(
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/utils/TableDiskUsageTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/utils/TableDiskUsageTest.java
new file mode 100644
index 00000000000..d8ddc173dfe
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/utils/TableDiskUsageTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.utils;
+
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.tablemodel.CompactionTableModelTestFileWriter;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.DataRegionTableSizeQueryContext;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.TableDiskUsageCache;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.TableDiskUsageCacheReader;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.TimePartitionTableSizeQueryContext;
+
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.read.common.TimeRange;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TableDiskUsageTest extends AbstractCompactionTest {
+
+  private DataRegion mockDataRegion;
+  private TsFileManager mockTsFileManager;
+
+  @Before
+  public void setUp()
+      throws IOException, WriteProcessException, MetadataException, 
InterruptedException {
+    super.setUp();
+    TableDiskUsageCache.getInstance().ensureRunning();
+    mockDataRegion = Mockito.mock(DataRegion.class);
+    Mockito.when(mockDataRegion.getDatabaseName()).thenReturn("test");
+    Mockito.when(mockDataRegion.getDataRegionId()).thenReturn(new 
DataRegionId(0));
+    Mockito.when(mockDataRegion.getDataRegionIdString()).thenReturn("0");
+    StorageEngine.getInstance().setDataRegion(new DataRegionId(0), 
mockDataRegion);
+    mockTsFileManager = new TsFileManager("test", "0", "");
+    
Mockito.when(mockDataRegion.getTsFileManager()).thenReturn(mockTsFileManager);
+    TableDiskUsageCache.getInstance().registerRegion(mockDataRegion);
+  }
+
+  @After
+  public void tearDown() throws IOException, StorageEngineException {
+    super.tearDown();
+    StorageEngine.getInstance().deleteDataRegion(new DataRegionId(0));
+  }
+
+  @Test
+  public void test1() throws Exception {
+    TsFileResource resource = prepareFile(4);
+    mockTsFileManager.add(resource, true);
+
+    DataRegionTableSizeQueryContext context = new 
DataRegionTableSizeQueryContext(false);
+    // only query table1 and table2
+    Map<String, Long> timePartitionTableSizeMap = new HashMap<>();
+    timePartitionTableSizeMap.put("table1", 0L);
+    timePartitionTableSizeMap.put("table2", 0L);
+    context.addTimePartition(0, new 
TimePartitionTableSizeQueryContext(timePartitionTableSizeMap));
+    TableDiskUsageCacheReader reader =
+        new TableDiskUsageCacheReader(mockDataRegion, context, false);
+    try {
+      Assert.assertTrue(reader.prepareCacheReader(System.currentTimeMillis(), 
Long.MAX_VALUE));
+      Assert.assertTrue(
+          reader.loadObjectFileTableSizeCache(System.currentTimeMillis(), 
Long.MAX_VALUE));
+      Assert.assertTrue(
+          reader.prepareCachedTsFileIDKeys(System.currentTimeMillis(), 
Long.MAX_VALUE));
+      Assert.assertTrue(
+          reader.checkAllFilesInTsFileManager(System.currentTimeMillis(), 
Long.MAX_VALUE));
+      Assert.assertTrue(
+          
reader.readCacheValueFilesAndUpdateResultMap(System.currentTimeMillis(), 
Long.MAX_VALUE));
+    } finally {
+      reader.close();
+    }
+    for (Map.Entry<Long, TimePartitionTableSizeQueryContext> 
timePartitionEntry :
+        context.getTimePartitionTableSizeQueryContextMap().entrySet()) {
+      TimePartitionTableSizeQueryContext timePartitionContext = 
timePartitionEntry.getValue();
+      for (Map.Entry<String, Long> entry :
+          timePartitionContext.getTableSizeResultMap().entrySet()) {
+        String tableName = entry.getKey();
+        long size = entry.getValue();
+        Assert.assertNotEquals("table3", tableName);
+        Assert.assertNotEquals("table4", tableName);
+        Assert.assertTrue(size > 0);
+      }
+    }
+  }
+
+  @Test
+  public void test2() throws Exception {
+    // cached
+    TsFileResource resource1 = prepareFile(4);
+    mockTsFileManager.add(resource1, true);
+    Map<String, Long> tableSizeMap = new HashMap<>();
+    tableSizeMap.put("table1", 10000000L);
+    tableSizeMap.put("table2", 10000000L);
+    TableDiskUsageCache.getInstance()
+        .write(mockDataRegion.getDatabaseName(), resource1.getTsFileID(), 
tableSizeMap);
+
+    TsFileResource resource2 = prepareFile(4);
+    mockTsFileManager.add(resource2, true);
+
+    DataRegionTableSizeQueryContext context = new 
DataRegionTableSizeQueryContext(false);
+    // only query table1 and table2
+    Map<String, Long> timePartitionTableSizeMap = new HashMap<>();
+    timePartitionTableSizeMap.put("table1", 0L);
+    timePartitionTableSizeMap.put("table2", 0L);
+    context.addTimePartition(0, new 
TimePartitionTableSizeQueryContext(timePartitionTableSizeMap));
+    TableDiskUsageCacheReader reader =
+        new TableDiskUsageCacheReader(mockDataRegion, context, false);
+    try {
+      Assert.assertTrue(reader.prepareCacheReader(System.currentTimeMillis(), 
Long.MAX_VALUE));
+      Assert.assertTrue(
+          reader.loadObjectFileTableSizeCache(System.currentTimeMillis(), 
Long.MAX_VALUE));
+      Assert.assertTrue(
+          reader.prepareCachedTsFileIDKeys(System.currentTimeMillis(), 
Long.MAX_VALUE));
+      Assert.assertTrue(
+          reader.checkAllFilesInTsFileManager(System.currentTimeMillis(), 
Long.MAX_VALUE));
+      Assert.assertTrue(
+          
reader.readCacheValueFilesAndUpdateResultMap(System.currentTimeMillis(), 
Long.MAX_VALUE));
+    } finally {
+      reader.close();
+    }
+    for (Map.Entry<Long, TimePartitionTableSizeQueryContext> 
timePartitionEntry :
+        context.getTimePartitionTableSizeQueryContextMap().entrySet()) {
+      TimePartitionTableSizeQueryContext timePartitionContext = 
timePartitionEntry.getValue();
+      for (Map.Entry<String, Long> entry :
+          timePartitionContext.getTableSizeResultMap().entrySet()) {
+        String tableName = entry.getKey();
+        long size = entry.getValue();
+        Assert.assertNotEquals("table3", tableName);
+        Assert.assertNotEquals("table4", tableName);
+        Assert.assertTrue(size > 10000000L);
+      }
+    }
+  }
+
+  @Test
+  public void test3() throws Exception {
+    // deleted
+    TsFileResource resource1 = prepareFile(4);
+    Map<String, Long> tableSizeMap = new HashMap<>();
+    tableSizeMap.put("table1", 10000000L);
+    tableSizeMap.put("table2", 10000000L);
+    TableDiskUsageCache.getInstance()
+        .write(mockDataRegion.getDatabaseName(), resource1.getTsFileID(), 
tableSizeMap);
+
+    TsFileResource resource2 = prepareFile(4);
+    mockTsFileManager.add(resource2, true);
+
+    DataRegionTableSizeQueryContext context = new 
DataRegionTableSizeQueryContext(false);
+    // only query table1 and table2
+    Map<String, Long> timePartitionTableSizeMap = new HashMap<>();
+    timePartitionTableSizeMap.put("table1", 0L);
+    timePartitionTableSizeMap.put("table2", 0L);
+    context.addTimePartition(0, new 
TimePartitionTableSizeQueryContext(timePartitionTableSizeMap));
+    TableDiskUsageCacheReader reader =
+        new TableDiskUsageCacheReader(mockDataRegion, context, false);
+    try {
+      Assert.assertTrue(reader.prepareCacheReader(System.currentTimeMillis(), 
Long.MAX_VALUE));
+      Assert.assertTrue(
+          reader.loadObjectFileTableSizeCache(System.currentTimeMillis(), 
Long.MAX_VALUE));
+      Assert.assertTrue(
+          reader.prepareCachedTsFileIDKeys(System.currentTimeMillis(), 
Long.MAX_VALUE));
+      Assert.assertTrue(
+          reader.checkAllFilesInTsFileManager(System.currentTimeMillis(), 
Long.MAX_VALUE));
+      Assert.assertTrue(
+          
reader.readCacheValueFilesAndUpdateResultMap(System.currentTimeMillis(), 
Long.MAX_VALUE));
+    } finally {
+      reader.close();
+    }
+    for (Map.Entry<Long, TimePartitionTableSizeQueryContext> 
timePartitionEntry :
+        context.getTimePartitionTableSizeQueryContextMap().entrySet()) {
+      TimePartitionTableSizeQueryContext timePartitionContext = 
timePartitionEntry.getValue();
+      for (Map.Entry<String, Long> entry :
+          timePartitionContext.getTableSizeResultMap().entrySet()) {
+        String tableName = entry.getKey();
+        long size = entry.getValue();
+        Assert.assertNotEquals("table3", tableName);
+        Assert.assertNotEquals("table4", tableName);
+        Assert.assertTrue(size < 10000000L && size > 0);
+      }
+    }
+  }
+
+  private TsFileResource prepareFile(int tableNum) throws IOException {
+    TsFileResource resource1 = createEmptyFileAndResource(true);
+    try (CompactionTableModelTestFileWriter writer =
+        new CompactionTableModelTestFileWriter(resource1)) {
+      for (int i = 0; i < tableNum; i++) {
+        writer.registerTableSchema("table" + i, 
Collections.singletonList("device"));
+        writer.startChunkGroup("table" + i, Collections.singletonList("d1"));
+        writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
+            Arrays.asList("s0", "s1"),
+            new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new 
TimeRange(10, 12)}}},
+            TSEncoding.PLAIN,
+            CompressionType.LZ4,
+            Arrays.asList(false, false));
+        writer.endChunkGroup();
+      }
+      writer.endFile();
+    }
+    return resource1;
+  }
+}

Reply via email to