This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3267b4fa0d0 Add compaction selection cached device time index size
metric (#14582)
3267b4fa0d0 is described below
commit 3267b4fa0d08c91eb759de494d06c68bcd11af5c
Author: shuwenwei <[email protected]>
AuthorDate: Fri Jan 3 18:22:19 2025 +0800
Add compaction selection cached device time index size metric (#14582)
* add compaction selection cached device time index size metric
* remove debug
---
.../db/service/metrics/CompactionMetrics.java | 24 +++++++
.../schedule/CompactionScheduleContext.java | 21 ++++--
.../impl/RewriteCrossSpaceCompactionSelector.java | 7 +-
.../utils/CrossSpaceCompactionCandidate.java | 4 +-
.../selector/utils/TsFileResourceCandidate.java | 76 +++++++++++++---------
.../dataregion/tsfile/TsFileResource.java | 34 +++++-----
.../rescon/memory/TsFileResourceManager.java | 2 +-
.../iotdb/commons/service/metric/enums/Metric.java | 1 +
8 files changed, 110 insertions(+), 59 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java
index 8520b8f4664..631f3430535 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java
@@ -46,6 +46,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
public class CompactionMetrics implements IMetricSet {
private static final String NOT_ALIGNED = "not_aligned";
@@ -723,6 +724,16 @@ public class CompactionMetrics implements IMetricSet {
private Histogram settleCompactionTaskSelectedFileSize =
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
+ private final AtomicLong totalCachedDeviceTimeIndexSize = new AtomicLong(0);
+
+ public void addSelectionCachedDeviceTimeIndexSize(long size) {
+ totalCachedDeviceTimeIndexSize.addAndGet(size);
+ }
+
+ public void decreaseSelectionCachedDeviceTimeIndexSize(long size) {
+ totalCachedDeviceTimeIndexSize.addAndGet(-size);
+ }
+
public void updateCompactionTaskSelectionNum(CompactionScheduleContext
context) {
seqInnerSpaceCompactionTaskSelectedNum.set(context.getSubmitSeqInnerSpaceCompactionTaskNum());
unseqInnerSpaceCompactionTaskSelectedNum.set(
@@ -913,6 +924,14 @@ public class CompactionMetrics implements IMetricSet {
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
"settle");
+
+ metricService.createAutoGauge(
+ Metric.COMPACTION_SELECTION_CACHED_TIME_INDEX_SIZE.toString(),
+ MetricLevel.IMPORTANT,
+ this,
+ metrics -> totalCachedDeviceTimeIndexSize.get(),
+ Tag.NAME.toString(),
+ "total_cached_device_time_index_size");
}
private void unbindCompactionTaskSelection(AbstractMetricService
metricService) {
@@ -935,6 +954,11 @@ public class CompactionMetrics implements IMetricSet {
Tag.NAME.toString(),
taskType);
}
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.COMPACTION_SELECTION_CACHED_TIME_INDEX_SIZE.toString(),
+ Tag.NAME.toString(),
+ "total_cached_device_time_index_size");
}
// endregion
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java
index 7b9d7269184..4d51ba5010f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java
@@ -20,16 +20,15 @@
package org.apache.iotdb.db.storageengine.dataregion.compaction.schedule;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.service.metrics.CompactionMetrics;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.constant.CompactionTaskType;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICrossCompactionPerformer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ISeqCompactionPerformer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.IUnseqCompactionPerformer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.AbstractCompactionTask;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.SettleCompactionTask;
-import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.utils.DeviceInfo;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
-
-import org.apache.tsfile.file.metadata.IDeviceID;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ArrayDeviceTimeIndex;
import java.util.HashMap;
import java.util.Map;
@@ -47,23 +46,31 @@ public class CompactionScheduleContext {
private int partiallyDirtyFileNum = 0;
// end region
- private final Map<TsFileResource, Map<IDeviceID, DeviceInfo>>
partitionFileDeviceInfoCache;
+ private final Map<TsFileResource, ArrayDeviceTimeIndex>
partitionFileDeviceInfoCache;
+ private long cachedDeviceInfoSize = 0;
public CompactionScheduleContext() {
this.partitionFileDeviceInfoCache = new HashMap<>();
}
public void addResourceDeviceTimeIndex(
- TsFileResource tsFileResource, Map<IDeviceID, DeviceInfo> deviceInfoMap)
{
- partitionFileDeviceInfoCache.put(tsFileResource, deviceInfoMap);
+ TsFileResource tsFileResource, ArrayDeviceTimeIndex deviceTimeIndex) {
+ partitionFileDeviceInfoCache.put(tsFileResource, deviceTimeIndex);
+ long deviceTimeIndexSize =
+
tsFileResource.getDeviceTimeIndexRamSize().orElse(deviceTimeIndex.calculateRamSize());
+ cachedDeviceInfoSize += deviceTimeIndexSize;
+
CompactionMetrics.getInstance().addSelectionCachedDeviceTimeIndexSize(deviceTimeIndexSize);
}
- public Map<IDeviceID, DeviceInfo> getResourceDeviceInfo(TsFileResource
resource) {
+ public ArrayDeviceTimeIndex getResourceDeviceInfo(TsFileResource resource) {
return partitionFileDeviceInfoCache.get(resource);
}
public void clearTimePartitionDeviceInfoCache() {
partitionFileDeviceInfoCache.clear();
+ CompactionMetrics.getInstance()
+ .decreaseSelectionCachedDeviceTimeIndexSize(cachedDeviceInfoSize);
+ cachedDeviceInfoSize = 0;
}
public void incrementSubmitTaskNum(CompactionTaskType taskType, int num) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
index 92cccbfb24c..0ba247c1b6b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
@@ -53,6 +53,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
@@ -466,7 +467,8 @@ public class RewriteCrossSpaceCompactionSelector implements
ICrossSpaceSelector
InsertionCrossCompactionTaskResource result = new
InsertionCrossCompactionTaskResource();
boolean hasPreviousSeqFile = false;
- for (DeviceInfo unseqDeviceInfo : unseqFile.getDeviceInfoList()) {
+ for (Iterator<DeviceInfo> it = unseqFile.getDeviceInfoIterator();
it.hasNext(); ) {
+ DeviceInfo unseqDeviceInfo = it.next();
IDeviceID deviceId = unseqDeviceInfo.deviceId;
long startTimeOfUnSeqDevice = unseqDeviceInfo.startTime;
long endTimeOfUnSeqDevice = unseqDeviceInfo.endTime;
@@ -613,7 +615,8 @@ public class RewriteCrossSpaceCompactionSelector implements
ICrossSpaceSelector
return true;
}
- for (DeviceInfo device : candidate2.getDeviceInfoList()) {
+ for (Iterator<DeviceInfo> it = candidate2.getDeviceInfoIterator();
it.hasNext(); ) {
+ DeviceInfo device = it.next();
IDeviceID deviceId = device.deviceId;
if (!candidate1.containsDevice(deviceId)) {
continue;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/utils/CrossSpaceCompactionCandidate.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/utils/CrossSpaceCompactionCandidate.java
index a0a0c6f347d..52b9d5269b1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/utils/CrossSpaceCompactionCandidate.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/utils/CrossSpaceCompactionCandidate.java
@@ -28,6 +28,7 @@ import org.apache.tsfile.file.metadata.IDeviceID;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
@@ -97,7 +98,8 @@ public class CrossSpaceCompactionCandidate {
// unseq file resource has been deleted due to TTL and cannot upgrade to
DEVICE_TIME_INDEX
return false;
}
- for (DeviceInfo unseqDeviceInfo : unseqFile.getDeviceInfoList()) {
+ for (Iterator<DeviceInfo> it = unseqFile.getDeviceInfoIterator();
it.hasNext(); ) {
+ DeviceInfo unseqDeviceInfo = it.next();
IDeviceID deviceId = unseqDeviceInfo.deviceId;
boolean atLeastOneSeqFileSelected = false;
// The `previousSeqFile` means the seqFile which contains the device and
its endTime is just
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/utils/TsFileResourceCandidate.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/utils/TsFileResourceCandidate.java
index 7356f1aa624..6213c7ee89e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/utils/TsFileResourceCandidate.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/utils/TsFileResourceCandidate.java
@@ -30,10 +30,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
import org.apache.tsfile.file.metadata.IDeviceID;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.Iterator;
import java.util.Set;
public class TsFileResourceCandidate {
@@ -46,10 +43,10 @@ public class TsFileResourceCandidate {
@SuppressWarnings("squid:S1104")
public boolean isValidCandidate;
- private Map<IDeviceID, DeviceInfo> deviceInfoMap;
+ private ArrayDeviceTimeIndex deviceTimeIndex;
private boolean hasDetailedDeviceInfo;
- private CompactionScheduleContext compactionScheduleContext;
+ private final CompactionScheduleContext compactionScheduleContext;
public TsFileResourceCandidate(TsFileResource tsFileResource,
CompactionScheduleContext context) {
this.resource = tsFileResource;
@@ -73,44 +70,43 @@ public class TsFileResourceCandidate {
private void prepareDeviceInfos() throws IOException {
boolean canCacheDeviceInfo = resource.getStatus() !=
TsFileResourceStatus.UNCLOSED;
- if (deviceInfoMap == null && compactionScheduleContext != null) {
+ if (deviceTimeIndex == null && compactionScheduleContext != null) {
// get device info from cache
- deviceInfoMap =
compactionScheduleContext.getResourceDeviceInfo(this.resource);
- hasDetailedDeviceInfo = true;
+ deviceTimeIndex =
compactionScheduleContext.getResourceDeviceInfo(this.resource);
}
- if (deviceInfoMap != null) {
+ if (deviceTimeIndex != null) {
+ hasDetailedDeviceInfo = true;
return;
}
- deviceInfoMap = new LinkedHashMap<>();
- if (resource.getTimeIndexType() == ITimeIndex.FILE_TIME_INDEX_TYPE) {
+ ITimeIndex timeIndex = resource.getTimeIndex();
+ if (timeIndex instanceof ArrayDeviceTimeIndex) {
+ if (resource.isClosed()) {
+ deviceTimeIndex = (ArrayDeviceTimeIndex) timeIndex;
+ } else {
+ deviceTimeIndex = new ArrayDeviceTimeIndex();
+ for (IDeviceID device : ((ArrayDeviceTimeIndex)
timeIndex).getDevices()) {
+ deviceTimeIndex.updateStartTime(device,
timeIndex.getStartTime(device));
+ deviceTimeIndex.updateEndTime(device, timeIndex.getEndTime(device));
+ }
+ }
+ } else {
// deserialize resource file
resource.readLock();
try {
+ // deleted file with degraded time index
if (!resource.resourceFileExists()) {
hasDetailedDeviceInfo = false;
+ deviceTimeIndex = new ArrayDeviceTimeIndex();
return;
}
- ArrayDeviceTimeIndex timeIndex =
CompactionUtils.buildDeviceTimeIndex(resource);
- for (IDeviceID deviceId : timeIndex.getDevices()) {
- deviceInfoMap.put(
- deviceId,
- new DeviceInfo(
- deviceId, timeIndex.getStartTime(deviceId),
timeIndex.getEndTime(deviceId)));
- }
+ deviceTimeIndex = CompactionUtils.buildDeviceTimeIndex(resource);
} finally {
resource.readUnlock();
}
- } else {
- for (IDeviceID deviceId : resource.getDevices()) {
- deviceInfoMap.put(
- deviceId,
- new DeviceInfo(
- deviceId, resource.getStartTime(deviceId),
resource.getEndTime(deviceId)));
- }
}
hasDetailedDeviceInfo = true;
if (compactionScheduleContext != null && canCacheDeviceInfo) {
- compactionScheduleContext.addResourceDeviceTimeIndex(this.resource,
deviceInfoMap);
+ compactionScheduleContext.addResourceDeviceTimeIndex(this.resource,
deviceTimeIndex);
}
}
@@ -118,24 +114,40 @@ public class TsFileResourceCandidate {
this.selected = true;
}
- public List<DeviceInfo> getDeviceInfoList() throws IOException {
+ public Iterator<DeviceInfo> getDeviceInfoIterator() throws IOException {
prepareDeviceInfos();
- return new ArrayList<>(deviceInfoMap.values());
+ return new Iterator<DeviceInfo>() {
+
+ private final Iterator<IDeviceID> deviceIterator =
deviceTimeIndex.getDevices().iterator();
+
+ @Override
+ public boolean hasNext() {
+ return deviceIterator.hasNext();
+ }
+
+ @Override
+ public DeviceInfo next() {
+ IDeviceID deviceId = deviceIterator.next();
+ return new DeviceInfo(
+ deviceId, deviceTimeIndex.getStartTime(deviceId),
deviceTimeIndex.getEndTime(deviceId));
+ }
+ };
}
public Set<IDeviceID> getDevices() throws IOException {
prepareDeviceInfos();
- return deviceInfoMap.keySet();
+ return deviceTimeIndex.getDevices();
}
public DeviceInfo getDeviceInfoById(IDeviceID deviceId) throws IOException {
prepareDeviceInfos();
- return deviceInfoMap.get(deviceId);
+ return new DeviceInfo(
+ deviceId, deviceTimeIndex.getStartTime(deviceId),
deviceTimeIndex.getEndTime(deviceId));
}
public boolean containsDevice(IDeviceID deviceId) throws IOException {
prepareDeviceInfos();
- return deviceInfoMap.containsKey(deviceId);
+ return !deviceTimeIndex.definitelyNotContains(deviceId);
}
public boolean hasDetailedDeviceInfo() throws IOException {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index cd905ed259d..d173de421e6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -81,6 +81,7 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -163,7 +164,7 @@ public class TsFileResource implements PersistentResource {
private TsFileID tsFileID;
- private long ramSize;
+ private long deviceTimeIndexRamSize;
private AtomicInteger tierLevel;
@@ -1048,12 +1049,21 @@ public class TsFileResource implements
PersistentResource {
* @return resource map size
*/
public long calculateRamSize() {
- if (ramSize == 0) {
- ramSize = INSTANCE_SIZE + timeIndex.calculateRamSize();
- return ramSize;
- } else {
- return ramSize;
+ if (timeIndex.getTimeIndexType() == ITimeIndex.FILE_TIME_INDEX_TYPE) {
+ return INSTANCE_SIZE + timeIndex.calculateRamSize();
+ }
+ if (deviceTimeIndexRamSize == 0) {
+ deviceTimeIndexRamSize = timeIndex.calculateRamSize();
+ }
+ return INSTANCE_SIZE + deviceTimeIndexRamSize;
+ }
+
+ // used for compaction
+ public Optional<Long> getDeviceTimeIndexRamSize() {
+ if (!this.isClosed()) {
+ return Optional.empty();
}
+ return Optional.of(deviceTimeIndexRamSize);
}
public long getMaxPlanIndex() {
@@ -1257,10 +1267,6 @@ public class TsFileResource implements
PersistentResource {
}
}
- public long getRamSize() {
- return ramSize;
- }
-
/** the DeviceTimeIndex degrade to FileTimeIndex and release memory */
public long degradeTimeIndex() {
TimeIndexLevel timeIndexLevel = TimeIndexLevel.valueOf(getTimeIndexType());
@@ -1274,12 +1280,8 @@ public class TsFileResource implements
PersistentResource {
long endTime = timeIndex.getMaxEndTime();
// replace the DeviceTimeIndex with FileTimeIndex
timeIndex = new FileTimeIndex(startTime, endTime);
-
- long beforeRamSize = ramSize;
-
- ramSize = INSTANCE_SIZE + timeIndex.calculateRamSize();
-
- return beforeRamSize - ramSize;
+ // deviceTimeIndexRamSize has already been calculated before
+ return deviceTimeIndexRamSize - timeIndex.calculateRamSize();
}
private void generatePathToTimeSeriesMetadataMap() throws IOException {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TsFileResourceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TsFileResourceManager.java
index 6fe0461404a..c7eede74197 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TsFileResourceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TsFileResourceManager.java
@@ -78,7 +78,7 @@ public class TsFileResourceManager {
totalTimeIndexMemCost -= tsFileResource.calculateRamSize();
degradedTimeIndexNum--;
} else {
- totalTimeIndexMemCost -= tsFileResource.getRamSize();
+ totalTimeIndexMemCost -= tsFileResource.calculateRamSize();
}
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index b70aae9057f..064a47f2b89 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -93,6 +93,7 @@ public enum Metric {
COMPACTION_TASK_SELECTION_COST("compaction_task_selection_cost"),
COMPACTION_TASK_SELECTED_FILE("compaction_task_selected_file"),
COMPACTION_TASK_SELECTED_FILE_SIZE("compaction_task_selected_file_size"),
+
COMPACTION_SELECTION_CACHED_TIME_INDEX_SIZE("compaction_selection_cached_time_index_size"),
// schema engine related
MEM("mem"),
CACHE("cache"),