This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch compression_in_show_regions in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7f279fcd6754a6ba5e9cdd4206c0f4ec6bf6151a Author: Tian Jiang <[email protected]> AuthorDate: Thu Jun 12 09:44:00 2025 +0800 add compression ratio in show regions --- .../heartbeat/DataNodeHeartbeatHandler.java | 5 + .../confignode/manager/load/cache/LoadCache.java | 11 ++ .../manager/partition/PartitionManager.java | 9 +- .../impl/DataNodeInternalRPCServiceImpl.java | 6 ++ .../execution/config/metadata/ShowRegionTask.java | 11 +- .../dataregion/flush/CompressionRatio.java | 116 +++++++++++++++++++-- .../dataregion/memtable/TsFileProcessor.java | 2 +- .../file/UnsealedTsFileRecoverPerformer.java | 2 +- .../dataregion/flush/CompressionRatioTest.java | 3 +- .../schema/column/ColumnHeaderConstant.java | 4 +- .../src/main/thrift/confignode.thrift | 1 + .../src/main/thrift/datanode.thrift | 1 + 12 files changed, 156 insertions(+), 15 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java index 3613c337862..e7a31b1dc73 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java @@ -158,6 +158,11 @@ public class DataNodeHeartbeatHandler implements AsyncMethodCallback<TDataNodeHe if (heartbeatResp.isSetRegionDisk()) { loadManager.getLoadCache().updateRegionSizeMap(nodeId, heartbeatResp.getRegionDisk()); } + if (heartbeatResp.isSetDataRegionRawDataSize()) { + loadManager + .getLoadCache() + .updateRegionRawSizeMap(nodeId, heartbeatResp.getDataRegionRawDataSize()); + } } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java index 666c676abdf..d864a43f7b0 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java @@ -93,6 +93,8 @@ public class LoadCache { private final Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap; // Map<NodeId, Map<RegionGroupId, RegionSize>> private final Map<Integer, Map<Integer, Long>> regionSizeMap; + // Map<NodeId, Map<RegionGroupId, RegionRawSize>> + private final Map<Integer, Map<Integer, Long>> regionRawSizeMap; // Map<RegionGroupId, ConsensusGroupCache> private final Map<TConsensusGroupId, ConsensusGroupCache> consensusGroupCacheMap; // Map<DataNodeId, confirmedConfigNodes> @@ -105,6 +107,7 @@ public class LoadCache { this.heartbeatProcessingMap = new ConcurrentHashMap<>(); this.regionGroupCacheMap = new ConcurrentHashMap<>(); this.regionSizeMap = new ConcurrentHashMap<>(); + this.regionRawSizeMap = new ConcurrentHashMap<>(); this.consensusGroupCacheMap = new ConcurrentHashMap<>(); this.confirmedConfigNodeMap = new ConcurrentHashMap<>(); this.topologyGraph = new HashMap<>(); @@ -807,7 +810,15 @@ public class LoadCache { this.regionSizeMap.put(dataNodeId, regionSizeMap); } + public void updateRegionRawSizeMap(int dataNodeId, Map<Integer, Long> regionRawSizeMap) { + this.regionRawSizeMap.put(dataNodeId, regionRawSizeMap); + } + public Map<Integer, Map<Integer, Long>> getRegionSizeMap() { return regionSizeMap; } + + public Map<Integer, Map<Integer, Long>> getRegionRawSizeMap() { + return regionRawSizeMap; + } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java index fa051b74155..8259efb567f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java @@ -1097,10 +1097,17 @@ public class PartitionManager { .getOrDefault(regionInfo.getDataNodeId(), Collections.emptyMap()) .getOrDefault(regionInfo.getConsensusGroupId().getId(), -1L); regionInfo.setTsFileSize(regionSize); + + long rawDataSize = + getLoadManager() + .getLoadCache() + .getRegionRawSizeMap() + .getOrDefault(regionInfo.getDataNodeId(), Collections.emptyMap()) + .getOrDefault(regionInfo.getConsensusGroupId().getId(), -1L); + regionInfo.setRawDataSize(rawDataSize); }); return regionInfoListResp; - } catch (final ConsensusException e) { LOGGER.warn(CONSENSUS_READ_ERROR, e); final TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index 368b3b027d1..01e8533f644 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -173,6 +173,7 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.repair.RepairTask import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleTaskManager; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager; import org.apache.iotdb.db.storageengine.dataregion.compaction.settle.SettleRequestHandler; +import org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio; import org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate; import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate; import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; @@ -1981,6 +1982,11 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface resp.setLoadSample(loadSample); resp.setRegionDisk(FileMetrics.getInstance().getRegionSizeMap()); + Map<Integer, Long> regionRawDataSize = new HashMap<>(); + CompressionRatio.getInstance() + .getDataRegionRatioMap() + .forEach((key, value) -> regionRawDataSize.put(Integer.parseInt(key), value.getLeft())); + resp.setDataRegionRawDataSize(regionRawDataSize); } AuthorityChecker.getAuthorityFetcher().refreshToken(); resp.setHeartbeatTimestamp(req.getHeartbeatTimestamp()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowRegionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowRegionTask.java index ae40ca6284d..95ddd28ff74 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowRegionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowRegionTask.java @@ -110,15 +110,22 @@ public class ShowRegionTask implements IConfigTask { BytesUtils.valueOf(DateTimeUtils.convertLongToDate(regionInfo.getCreateTime()))); // region size String regionSizeStr = ""; + double compressionRatio = Double.NaN; if (regionInfo.getConsensusGroupId().getType().ordinal() == TConsensusGroupType.DataRegion.ordinal()) { - if (regionInfo.getTsFileSize() != -1) { - regionSizeStr = FileUtils.humanReadableByteCountSI(regionInfo.getTsFileSize()); + long tsFileSize = regionInfo.getTsFileSize(); + if (tsFileSize != -1) { + regionSizeStr = FileUtils.humanReadableByteCountSI(tsFileSize); } else { regionSizeStr = "Unknown"; } + long rawDataSize = regionInfo.getRawDataSize(); + if (rawDataSize != -1) { + compressionRatio = (double) rawDataSize / tsFileSize; + } } builder.getColumnBuilder(12).writeBinary(BytesUtils.valueOf(regionSizeStr)); + builder.getColumnBuilder(13).writeDouble(compressionRatio); builder.declarePosition(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatio.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatio.java index 6253717386e..c22afac0817 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatio.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatio.java @@ -25,13 +25,17 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.commons.io.FileUtils; import org.apache.tsfile.utils.FilePathUtils; +import org.apache.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.nio.file.Files; +import java.util.HashMap; import java.util.Locale; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; /** @@ -65,6 +69,9 @@ public class CompressionRatio { /** The data size on disk */ private long totalDiskSize = 0L; + /** DataRegionId -> (memorySize, diskSize) */ + private Map<String, Pair<Long, Long>> dataRegionRatioMap = new ConcurrentHashMap<>(); + private File directory; private String oldFileName = String.format(RATIO_FILE_PATH_FORMAT, 0, 0); @@ -84,8 +91,9 @@ public class CompressionRatio { * Whenever the task of closing a file ends, the compression ratio of the file is calculated and * call this method. */ - public synchronized void updateRatio(long memorySize, long diskSize) throws IOException { - File oldFile = SystemFileFactory.INSTANCE.getFile(directory, oldFileName); + public synchronized void updateRatio(long memorySize, long diskSize, String dataRegionId) + throws IOException { + File oldDataNodeFile = SystemFileFactory.INSTANCE.getFile(directory, oldFileName); totalMemorySize.addAndGet(memorySize); totalDiskSize += diskSize; @@ -95,12 +103,38 @@ public class CompressionRatio { memorySize, totalMemorySize); } - File newFile = + File newDataNodeFile = SystemFileFactory.INSTANCE.getFile( directory, String.format( Locale.ENGLISH, RATIO_FILE_PATH_FORMAT, totalMemorySize.get(), totalDiskSize)); - persist(oldFile, newFile); + persist(oldDataNodeFile, newDataNodeFile); + + Pair<Long, Long> dataRegionCompressionRatio = + dataRegionRatioMap.computeIfAbsent(dataRegionId, id -> new Pair<>(0L, 0L)); + File oldDataRegionFile = + SystemFileFactory.INSTANCE.getFile( + directory, + String.format( + Locale.ENGLISH, + RATIO_FILE_PATH_FORMAT, + dataRegionCompressionRatio.getLeft(), + dataRegionCompressionRatio.getRight()) + + "." + + dataRegionId); + dataRegionCompressionRatio.setLeft(dataRegionCompressionRatio.getLeft() + memorySize); + dataRegionCompressionRatio.setRight(dataRegionCompressionRatio.getRight() + diskSize); + File newDataRegionFile = + SystemFileFactory.INSTANCE.getFile( + directory, + String.format( + Locale.ENGLISH, + RATIO_FILE_PATH_FORMAT, + dataRegionCompressionRatio.getLeft(), + dataRegionCompressionRatio.getRight()) + + "." + + dataRegionId); + persist(oldDataRegionFile, newDataRegionFile); } /** Get the average compression ratio for all closed files */ @@ -108,6 +142,20 @@ public class CompressionRatio { return (double) totalMemorySize.get() / totalDiskSize; } + public double getRatio(String dataRegionId) { + Pair<Long, Long> ratioPair = + dataRegionRatioMap.compute( + dataRegionId, + (dId, oldPair) -> { + if (oldPair == null) { + return new Pair<>(0L, 0L); + } + // return a copy to avoid concurrent modification + return new Pair<>(oldPair.left, oldPair.right); + }); + return (double) ratioPair.left / ratioPair.right; + } + private void persist(File oldFile, File newFile) throws IOException { checkDirectoryExist(); if (!oldFile.exists()) { @@ -132,12 +180,47 @@ public class CompressionRatio { } } - /** Restore compression ratio statistics from disk when system restart */ - void restore() throws IOException { - if (!directory.exists()) { + private void recoverDataRegionRatio(File[] ratioFiles) { + if (ratioFiles == null) { return; } - File[] ratioFiles = directory.listFiles((dir, name) -> name.startsWith(FILE_PREFIX)); + + Map<String, Integer> validFileIndex = new HashMap<>(); + for (int i = 0, ratioFilesLength = ratioFiles.length; i < ratioFilesLength; i++) { + File ratioFile = ratioFiles[i]; + String fileName = ratioFile.getName(); + String ratioPart = fileName.substring(0, fileName.lastIndexOf(".")); + String dataRegionId = fileName.substring(fileName.lastIndexOf(".") + 1); + + String[] fileNameArray = ratioPart.split("-"); + // fileNameArray.length != 3 means the compression ratio may be negative, ignore it + if (fileNameArray.length == 3) { + try { + Pair<Long, Long> regionRatioPair = + dataRegionRatioMap.computeIfAbsent(dataRegionId, id -> new Pair<>(0L, 0L)); + long diskSize = Long.parseLong(fileNameArray[2]); + if (diskSize > regionRatioPair.getRight()) { + regionRatioPair.setRight(diskSize); + regionRatioPair.setLeft(Long.parseLong(fileNameArray[1])); + validFileIndex.put(dataRegionId, i); + } + } catch (NumberFormatException ignore) { + // ignore illegal compression file name + } + } + } + validFileIndex.values().forEach(i -> ratioFiles[i] = null); + + for (File ratioFile : ratioFiles) { + if (ratioFile != null) { + if (!ratioFile.delete()) { + LOGGER.warn("Cannot delete ratio file {}", ratioFile.getAbsolutePath()); + } + } + } + } + + private void recoverDataNodeRatio(File[] ratioFiles) throws IOException { // First try to recover from the new version of the file, parse the file name, and get the file // with the largest disk size value if (ratioFiles != null && ratioFiles.length > 0) { @@ -191,6 +274,19 @@ public class CompressionRatio { } } + /** Restore compression ratio statistics from disk when system restart */ + void restore() throws IOException { + if (!directory.exists()) { + return; + } + File[] dataNodeRatioFiles = + directory.listFiles((dir, name) -> name.startsWith(FILE_PREFIX) && !name.contains(".")); + recoverDataNodeRatio(dataNodeRatioFiles); + File[] dataRegionRatioFiles = + directory.listFiles((dir, name) -> name.startsWith(FILE_PREFIX) && name.contains(".")); + recoverDataRegionRatio(dataRegionRatioFiles); + } + public static void deleteRedundantFilesByIndex(File[] files, int index) throws IOException { for (int i = 0; i < files.length; i++) { if (i != index) { @@ -215,6 +311,10 @@ public class CompressionRatio { totalDiskSize = 0L; } + public Map<String, Pair<Long, Long>> getDataRegionRatioMap() { + return dataRegionRatioMap; + } + public static CompressionRatio getInstance() { return CompressionRatioHolder.INSTANCE; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index 861b72c923a..5617fd9f99f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -1715,7 +1715,7 @@ public class TsFileProcessor { String dataRegionId = dataRegionInfo.getDataRegion().getDataRegionId(); WritingMetrics.getInstance() .recordTsFileCompressionRatioOfFlushingMemTable(dataRegionId, compressionRatio); - CompressionRatio.getInstance().updateRatio(totalMemTableSize, writer.getPos()); + CompressionRatio.getInstance().updateRatio(totalMemTableSize, writer.getPos(), dataRegionId); } catch (IOException e) { logger.error( "{}: {} update compression ratio failed", diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java index c955f67c79c..f90cb0353bb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java @@ -292,7 +292,7 @@ public class UnsealedTsFileRecoverPerformer extends AbstractTsFileRecoverPerform String.format("%.2f", compressionRatio), memTableSize, writer.getPos()); - CompressionRatio.getInstance().updateRatio(memTableSize, writer.getPos()); + CompressionRatio.getInstance().updateRatio(memTableSize, writer.getPos(), dataRegionId); } catch (IOException e) { logger.error( "{}: {} update compression ratio failed", diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatioTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatioTest.java index f088fee719a..d78d61168f7 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatioTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatioTest.java @@ -63,7 +63,7 @@ public class CompressionRatioTest { long totalDiskSize = 5; for (int i = 0; i < 5; i++) { - this.compressionRatio.updateRatio(10, 5); + this.compressionRatio.updateRatio(10, 5, "dr" + i); if (!new File( directory, String.format( @@ -75,6 +75,7 @@ public class CompressionRatioTest { fail(); } assertEquals(2, this.compressionRatio.getRatio(), 0.1); + assertEquals(2, this.compressionRatio.getRatio("dr" + i), 0.1); totalMemorySize += 10; totalDiskSize += 5; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java index 72fe1ce0236..684cbedda78 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java @@ -163,6 +163,7 @@ public class ColumnHeaderConstant { public static final String ROLE = "Role"; public static final String CREATE_TIME = "CreateTime"; public static final String TSFILE_SIZE = "TsFileSize"; + public static final String COMPRESSION_RATIO = "CompressionRatio"; // column names for show datanodes public static final String SCHEMA_REGION_NUM = "SchemaRegionNum"; @@ -429,7 +430,8 @@ public class ColumnHeaderConstant { new ColumnHeader(INTERNAL_ADDRESS, TSDataType.TEXT), new ColumnHeader(ROLE, TSDataType.TEXT), new ColumnHeader(CREATE_TIME, TSDataType.TEXT), - new ColumnHeader(TSFILE_SIZE, TSDataType.TEXT)); + new ColumnHeader(TSFILE_SIZE, TSDataType.TEXT), + new ColumnHeader(COMPRESSION_RATIO, TSDataType.DOUBLE)); public static final List<ColumnHeader> showAINodesColumnHeaders = ImmutableList.of( diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift index 3f4c42d058a..cce97f015dc 100644 --- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift +++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift @@ -721,6 +721,7 @@ struct TRegionInfo { 10: optional i64 createTime 11: optional string internalAddress 12: optional i64 tsFileSize + 13: optional i64 rawDataSize } struct TShowRegionResp { diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift index 70456a952f3..9d2daa7e14f 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift @@ -310,6 +310,7 @@ struct TDataNodeHeartbeatResp { 14: optional list<bool> pipeCompletedList 15: optional list<i64> pipeRemainingEventCountList 16: optional list<double> pipeRemainingTimeList + 17: optional map<i32, i64> dataRegionRawDataSize } struct TPipeHeartbeatReq {
