This is an automated email from the ASF dual-hosted git repository.
shuwenwei pushed a commit to branch table_disk_usage_statistics_with_cache
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to
refs/heads/table_disk_usage_statistics_with_cache by this push:
new 7ac968ce3b8 fix cache writer lifecycle
7ac968ce3b8 is described below
commit 7ac968ce3b8f0604e57da6f2acd19ecff87b6531
Author: shuwenwei <[email protected]>
AuthorDate: Mon Jan 26 14:40:22 2026 +0800
fix cache writer lifecycle
---
.../db/storageengine/dataregion/DataRegion.java | 3 +-
.../tableDiskUsageCache/TableDiskUsageCache.java | 77 +++++++++++-----------
.../TableDiskUsageCacheReader.java | 5 +-
3 files changed, 42 insertions(+), 43 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 44a54c769e3..de2981a9767 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -417,8 +417,7 @@ public class DataRegion implements IDataRegionForQuery {
IoTDBThreadPoolFactory.newSingleThreadExecutor(
databaseName + "-" + dataRegionIdString + "-UpgradeMod");
- TableDiskUsageCache.getInstance()
- .registerRegion(databaseName, Integer.parseInt(dataRegionIdString));
+ TableDiskUsageCache.getInstance().registerRegion(this);
// recover tsfiles unless consensus protocol is ratis and storage engine
is not ready
if
(config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java
index eb19aeffe27..a0f2fa4bab2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java
@@ -21,6 +21,7 @@ package
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
import
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.object.EmptyObjectTableSizeCacheReader;
import
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.object.IObjectTableSizeCacheReader;
@@ -135,20 +136,20 @@ public class TableDiskUsageCache {
}
public CompletableFuture<Pair<TsFileTableSizeCacheReader,
IObjectTableSizeCacheReader>> startRead(
- String database, int regionId, boolean readTsFileCache, boolean
readObjectFileCache) {
+ DataRegion dataRegion, boolean readTsFileCache, boolean
readObjectFileCache) {
StartReadOperation operation =
- new StartReadOperation(database, regionId, readTsFileCache,
readObjectFileCache);
+ new StartReadOperation(dataRegion, readTsFileCache,
readObjectFileCache);
addOperationToQueue(operation);
return operation.future;
}
- public void endRead(String database, int regionId) {
- EndReadOperation operation = new EndReadOperation(database, regionId);
+ public void endRead(DataRegion dataRegion) {
+ EndReadOperation operation = new EndReadOperation(dataRegion);
addOperationToQueue(operation);
}
- public void registerRegion(String database, int regionId) {
- RegisterRegionOperation operation = new RegisterRegionOperation(database,
regionId);
+ public void registerRegion(DataRegion region) {
+ RegisterRegionOperation operation = new RegisterRegionOperation(region);
addOperationToQueue(operation);
}
@@ -177,8 +178,9 @@ public class TableDiskUsageCache {
writerMap.values().forEach(DataRegionTableSizeCacheWriter::close);
}
- protected DataRegionTableSizeCacheWriter createWriter(String database, int
regionId) {
- return new DataRegionTableSizeCacheWriter(database, regionId);
+ protected DataRegionTableSizeCacheWriter createWriter(
+ String database, int regionId, DataRegion region) {
+ return new DataRegionTableSizeCacheWriter(database, regionId, region);
}
protected TsFileTableSizeCacheReader createTsFileCacheReader(
@@ -218,43 +220,33 @@ public class TableDiskUsageCache {
}
protected static class StartReadOperation extends Operation {
+ protected final DataRegion region;
protected final boolean readTsFileCache;
protected final boolean readObjectFileCache;
public CompletableFuture<Pair<TsFileTableSizeCacheReader,
IObjectTableSizeCacheReader>> future =
new CompletableFuture<>();
public StartReadOperation(
- String database, int regionId, boolean readTsFileCache, boolean
readObjectFileCache) {
- super(database, regionId);
+ DataRegion dataRegion, boolean readTsFileCache, boolean
readObjectFileCache) {
+ super(dataRegion.getDatabaseName(),
dataRegion.getDataRegionId().getId());
+ this.region = dataRegion;
this.readTsFileCache = readTsFileCache;
this.readObjectFileCache = readObjectFileCache;
}
@Override
public void apply(TableDiskUsageCache tableDiskUsageCache) throws
IOException {
+ DataRegionTableSizeCacheWriter writer =
tableDiskUsageCache.writerMap.get(regionId);
try {
- DataRegionTableSizeCacheWriter writer =
- tableDiskUsageCache.writerMap.computeIfAbsent(
- regionId, k -> tableDiskUsageCache.createWriter(database,
regionId));
- // It is safe to always increase activeReaderNum here. Before a
DataRegion is removed, it is
- // first marked as deleted, and all table size queries will skip
DataRegions that are
- // already marked deleted.
- // Under this guarantee, waiting for activeReaderNum to reach zero
will not be blocked by
- // newly created readers, and the region can be safely removed.
- writer.increaseActiveReaderNum();
- if (writer.getRemovedFuture() != null) {
+ if (writer == null || writer.getRemovedFuture() != null) {
// region is removed
future.complete(
new Pair<>(
- new TsFileTableSizeCacheReader(
- 0,
- writer.tsFileCacheWriter.getKeyFile(),
- 0,
- writer.tsFileCacheWriter.getValueFile(),
- regionId),
+ new TsFileTableSizeCacheReader(0, null, 0, null, regionId),
new EmptyObjectTableSizeCacheReader()));
return;
}
+ writer.increaseActiveReaderNum();
writer.flush();
TsFileTableSizeCacheReader tsFileTableSizeCacheReader =
readTsFileCache ?
tableDiskUsageCache.createTsFileCacheReader(writer, regionId) : null;
@@ -270,8 +262,11 @@ public class TableDiskUsageCache {
}
private static class EndReadOperation extends Operation {
- public EndReadOperation(String database, int regionId) {
- super(database, regionId);
+ protected final DataRegion region;
+
+ public EndReadOperation(DataRegion dataRegion) {
+ super(dataRegion.getDatabaseName(),
dataRegion.getDataRegionId().getId());
+ this.region = dataRegion;
}
@Override
@@ -279,6 +274,9 @@ public class TableDiskUsageCache {
tableDiskUsageCache.writerMap.computeIfPresent(
regionId,
(k, writer) -> {
+ if (writer.dataRegion != region) {
+ return writer;
+ }
writer.decreaseActiveReaderNum();
if (writer.getRemovedFuture() != null) {
writer.getRemovedFuture().complete(null);
@@ -303,12 +301,11 @@ public class TableDiskUsageCache {
@Override
public void apply(TableDiskUsageCache tableDiskUsageCache) throws
IOException {
- tableDiskUsageCache
- .writerMap
- .computeIfAbsent(
- regionId, k -> tableDiskUsageCache.createWriter(database,
tsFileID.regionId))
- .tsFileCacheWriter
- .write(tsFileID, tableSizeMap);
+ DataRegionTableSizeCacheWriter dataRegionTableSizeCacheWriter =
+ tableDiskUsageCache.writerMap.get(regionId);
+ if (dataRegionTableSizeCacheWriter != null) {
+ dataRegionTableSizeCacheWriter.tsFileCacheWriter.write(tsFileID,
tableSizeMap);
+ }
}
}
@@ -333,16 +330,18 @@ public class TableDiskUsageCache {
protected static class RegisterRegionOperation extends Operation {
+ protected final DataRegion dataRegion;
protected final CompletableFuture<Void> future = new CompletableFuture<>();
- public RegisterRegionOperation(String database, int regionId) {
- super(database, regionId);
+ public RegisterRegionOperation(DataRegion dataRegion) {
+ super(dataRegion.getDatabaseName(),
dataRegion.getDataRegionId().getId());
+ this.dataRegion = dataRegion;
}
@Override
public void apply(TableDiskUsageCache tableDiskUsageCache) {
tableDiskUsageCache.writerMap.computeIfAbsent(
- regionId, regionId -> tableDiskUsageCache.createWriter(database,
regionId));
+ regionId, regionId -> tableDiskUsageCache.createWriter(database,
regionId, dataRegion));
future.complete(null);
}
@@ -394,11 +393,13 @@ public class TableDiskUsageCache {
}
protected static class DataRegionTableSizeCacheWriter {
+ protected final DataRegion dataRegion;
protected final TsFileTableDiskUsageCacheWriter tsFileCacheWriter;
protected int activeReaderNum = 0;
protected CompletableFuture<Void> removedFuture;
- protected DataRegionTableSizeCacheWriter(String database, int regionId) {
+ protected DataRegionTableSizeCacheWriter(String database, int regionId,
DataRegion dataRegion) {
+ this.dataRegion = dataRegion;
this.tsFileCacheWriter = new TsFileTableDiskUsageCacheWriter(database,
regionId);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCacheReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCacheReader.java
index cdd72fc064c..c72bdc72e3c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCacheReader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCacheReader.java
@@ -76,8 +76,7 @@ public class TableDiskUsageCacheReader implements Closeable {
if (this.tsFileTableSizeCacheReader == null) {
this.prepareReaderFuture =
this.prepareReaderFuture == null
- ? TableDiskUsageCache.getInstance()
- .startRead(dataRegion.getDatabaseName(), regionId, true,
true)
+ ? TableDiskUsageCache.getInstance().startRead(dataRegion, true,
true)
: this.prepareReaderFuture;
do {
if (prepareReaderFuture.isDone()) {
@@ -180,7 +179,7 @@ public class TableDiskUsageCacheReader implements Closeable
{
closeTsFileTableSizeCacheReader();
closeObjectFileTableSizeCacheReader();
if (prepareReaderFuture != null) {
- TableDiskUsageCache.getInstance().endRead(dataRegion.getDatabaseName(),
regionId);
+ TableDiskUsageCache.getInstance().endRead(dataRegion);
prepareReaderFuture = null;
}
dataRegionContext.releaseMemory();