This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.2 by this push:
new 1edaaf318c3 [To rel/1.2] [metrics] Refactor compaction read write
throughput metrics (#10301)
1edaaf318c3 is described below
commit 1edaaf318c36566c345e355957dc17d6d3e994ef
Author: Liu Xuxin <[email protected]>
AuthorDate: Sun Jun 25 21:57:56 2023 +0800
[To rel/1.2] [metrics] Refactor compaction read write throughput metrics
(#10301)
---
.../commons/concurrent/DataNodeThreadModule.java | 1 +
.../iotdb/commons/concurrent/ThreadName.java | 11 +-
.../impl/ReadChunkCompactionPerformer.java | 21 ++-
.../execute/utils/MultiTsFileDeviceIterator.java | 19 +-
.../fast/AlignedSeriesCompactionExecutor.java | 21 +++
.../readchunk/AlignedSeriesCompactionExecutor.java | 99 ++++++-----
.../readchunk/SingleSeriesCompactionExecutor.java | 56 ++----
.../utils/writer/AbstractCompactionWriter.java | 73 +++-----
.../writer/AbstractCrossCompactionWriter.java | 24 +--
.../writer/AbstractInnerCompactionWriter.java | 13 +-
.../utils/writer/FastCrossCompactionWriter.java | 4 +-
.../writer/ReadPointCrossCompactionWriter.java | 2 +-
.../writer/ReadPointInnerCompactionWriter.java | 2 +-
.../compaction/io/CompactionTsFileReader.java | 178 +++++++++++++++++++
.../compaction/io/CompactionTsFileWriter.java | 130 ++++++++++++++
.../compaction/schedule/CompactionTaskManager.java | 10 --
.../schedule/constant/CompactionIoDataType.java | 33 ++--
.../iotdb/db/engine/storagegroup/DataRegion.java | 5 +
.../db/service/metrics/CompactionMetrics.java | 192 ++++++++++++++-------
.../iotdb/db/service/metrics/FileMetrics.java | 1 +
.../iotdb/tsfile/read/TsFileDeviceIterator.java | 3 +-
.../iotdb/tsfile/read/TsFileSequenceReader.java | 12 ++
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 28 ++-
23 files changed, 667 insertions(+), 271 deletions(-)
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/DataNodeThreadModule.java
b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/DataNodeThreadModule.java
index f0d61466497..3023983f05a 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/DataNodeThreadModule.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/DataNodeThreadModule.java
@@ -34,6 +34,7 @@ public enum DataNodeThreadModule {
JVM,
LOG_BACK,
METRICS,
+ SYSTEM,
OTHER,
UNKNOWN
}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index e28887791b4..e9a136a7b08 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -137,6 +137,9 @@ public enum ThreadName {
PROMETHEUS_REACTOR_HTTP_NIO("reactor-http-nio"),
PROMETHEUS_REACTOR_HTTP_EPOLL("reactor-http-epoll"),
PROMETHEUS_BOUNDED_ELASTIC("boundedElastic-evictor"),
+ // -------------------------- System --------------------------
+ FORK_JOIN_POOL("ForkJoinPool"),
+ TIMER("timer"),
// -------------------------- Other --------------------------
TTL_CHECK("TTL-CHECK"),
SETTLE("Settle"),
@@ -149,8 +152,8 @@ public enum ThreadName {
// the unknown thread name is used for metrics
UNKOWN("UNKNOWN");
- private final String name;
private static final Logger log = LoggerFactory.getLogger(ThreadName.class);
+ private final String name;
private static Set<ThreadName> queryThreadNames =
new HashSet<>(
Arrays.asList(
@@ -264,6 +267,9 @@ public enum ThreadName {
PROMETHEUS_REACTOR_HTTP_NIO,
PROMETHEUS_REACTOR_HTTP_EPOLL,
PROMETHEUS_BOUNDED_ELASTIC));
+
+ private static Set<ThreadName> systemThreadNames =
+ new HashSet<>(Arrays.asList(FORK_JOIN_POOL, TIMER));
private static Set<ThreadName> otherThreadNames =
new HashSet<>(
Arrays.asList(
@@ -297,6 +303,7 @@ public enum ThreadName {
computeThreadNames,
jvmThreadNames,
metricsThreadNames,
+ systemThreadNames,
otherThreadNames
};
DataNodeThreadModule[] modules =
@@ -313,6 +320,7 @@ public enum ThreadName {
DataNodeThreadModule.COMPUTE,
DataNodeThreadModule.JVM,
DataNodeThreadModule.METRICS,
+ DataNodeThreadModule.SYSTEM,
DataNodeThreadModule.OTHER
};
@@ -339,6 +347,7 @@ public enum ThreadName {
return module;
}
}
+ log.debug("The module for this thread is unknown: {}", givenThreadName);
return null;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
index da0c9c91dd8..27d4d48afc7 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.db.engine.compaction.execute.performer.impl;
-import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -27,6 +26,8 @@ import
org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary;
import
org.apache.iotdb.db.engine.compaction.execute.utils.MultiTsFileDeviceIterator;
import
org.apache.iotdb.db.engine.compaction.execute.utils.executor.readchunk.AlignedSeriesCompactionExecutor;
import
org.apache.iotdb.db.engine.compaction.execute.utils.executor.readchunk.SingleSeriesCompactionExecutor;
+import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileWriter;
+import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.rescon.SystemInfo;
@@ -34,18 +35,12 @@ import
org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer {
- private static final Logger LOGGER =
- LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
private TsFileResource targetResource;
private List<TsFileResource> seqFiles;
private CompactionTaskSummary summary;
@@ -71,8 +66,12 @@ public class ReadChunkCompactionPerformer implements
ISeqCompactionPerformer {
/
IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount()
*
IoTDBDescriptor.getInstance().getConfig().getChunkMetadataSizeProportion());
try (MultiTsFileDeviceIterator deviceIterator = new
MultiTsFileDeviceIterator(seqFiles);
- TsFileIOWriter writer =
- new TsFileIOWriter(targetResource.getTsFile(), true,
sizeForFileWriter)) {
+ CompactionTsFileWriter writer =
+ new CompactionTsFileWriter(
+ targetResource.getTsFile(),
+ true,
+ sizeForFileWriter,
+ CompactionType.INNER_SEQ_COMPACTION)) {
while (deviceIterator.hasNextDevice()) {
Pair<String, Boolean> deviceInfo = deviceIterator.nextDevice();
String device = deviceInfo.left;
@@ -113,7 +112,7 @@ public class ReadChunkCompactionPerformer implements
ISeqCompactionPerformer {
private void compactAlignedSeries(
String device,
TsFileResource targetResource,
- TsFileIOWriter writer,
+ CompactionTsFileWriter writer,
MultiTsFileDeviceIterator deviceIterator)
throws IOException, InterruptedException {
checkThreadInterrupted();
@@ -153,7 +152,7 @@ public class ReadChunkCompactionPerformer implements
ISeqCompactionPerformer {
private void compactNotAlignedSeries(
String device,
TsFileResource targetResource,
- TsFileIOWriter writer,
+ CompactionTsFileWriter writer,
MultiTsFileDeviceIterator deviceIterator)
throws IOException, MetadataException, InterruptedException {
writer.startChunkGroup(device);
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/MultiTsFileDeviceIterator.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/MultiTsFileDeviceIterator.java
index 7e2484e0d71..28329e0d5cb 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/MultiTsFileDeviceIterator.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/MultiTsFileDeviceIterator.java
@@ -20,6 +20,8 @@ package org.apache.iotdb.db.engine.compaction.execute.utils;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileReader;
+import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -70,7 +72,9 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
this.tsFileResourcesSortedByDesc,
TsFileResource::compareFileCreationOrderByDesc);
try {
for (TsFileResource tsFileResource : this.tsFileResourcesSortedByDesc) {
- TsFileSequenceReader reader = new
TsFileSequenceReader(tsFileResource.getTsFilePath());
+ CompactionTsFileReader reader =
+ new CompactionTsFileReader(
+ tsFileResource.getTsFilePath(),
CompactionType.INNER_SEQ_COMPACTION);
readerMap.put(tsFileResource, reader);
deviceIteratorMap.put(tsFileResource,
reader.getAllDevicesIteratorWithIsAligned());
}
@@ -112,8 +116,19 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
Collections.sort(
this.tsFileResourcesSortedByDesc,
TsFileResource::compareFileCreationOrderByDesc);
this.readerMap = readerMap;
+
+ CompactionType type = null;
+ if (!seqResources.isEmpty() && !unseqResources.isEmpty()) {
+ type = CompactionType.CROSS_COMPACTION;
+ } else if (seqResources.isEmpty()) {
+ type = CompactionType.INNER_UNSEQ_COMPACTION;
+ } else {
+ type = CompactionType.INNER_SEQ_COMPACTION;
+ }
+
for (TsFileResource tsFileResource : tsFileResourcesSortedByDesc) {
- TsFileSequenceReader reader = new
TsFileSequenceReader(tsFileResource.getTsFilePath());
+ TsFileSequenceReader reader =
+ new CompactionTsFileReader(tsFileResource.getTsFilePath(), type);
readerMap.put(tsFileResource, reader);
deviceIteratorMap.put(tsFileResource,
reader.getAllDevicesIteratorWithIsAligned());
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java
index cd7adbb1adc..4d68016a978 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java
@@ -25,6 +25,7 @@ import
org.apache.iotdb.db.engine.compaction.execute.utils.executor.fast.element
import
org.apache.iotdb.db.engine.compaction.execute.utils.executor.fast.element.FileElement;
import
org.apache.iotdb.db.engine.compaction.execute.utils.executor.fast.element.PageElement;
import
org.apache.iotdb.db.engine.compaction.execute.utils.writer.AbstractCompactionWriter;
+import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileReader;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.WriteProcessException;
@@ -88,6 +89,8 @@ public class AlignedSeriesCompactionExecutor extends
SeriesCompactionExecutor {
@Override
protected void compactFiles()
throws PageException, IOException, WriteProcessException,
IllegalPathException {
+ markStartOfAlignedSeries();
+
while (!fileList.isEmpty()) {
List<FileElement> overlappedFiles = findOverlapFiles(fileList.get(0));
@@ -96,6 +99,24 @@ public class AlignedSeriesCompactionExecutor extends
SeriesCompactionExecutor {
compactChunks();
}
+
+ markEndOfAlignedSeries();
+ }
+
+ private void markStartOfAlignedSeries() {
+ for (TsFileSequenceReader reader : readerCacheMap.values()) {
+ if (reader instanceof CompactionTsFileReader) {
+ ((CompactionTsFileReader) reader).markStartOfAlignedSeries();
+ }
+ }
+ }
+
+ private void markEndOfAlignedSeries() {
+ for (TsFileSequenceReader reader : readerCacheMap.values()) {
+ if (reader instanceof CompactionTsFileReader) {
+ ((CompactionTsFileReader) reader).markEndOfAlignedSeries();
+ }
+ }
}
/** Deserialize files into chunk metadatas and put them into the chunk
metadata queue. */
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java
index 6690d133ac8..08eeaba45a7 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java
@@ -20,11 +20,9 @@ package
org.apache.iotdb.db.engine.compaction.execute.utils.executor.readchunk;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import
org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary;
-import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
-import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
-import
org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType;
+import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileReader;
+import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileWriter;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.service.metrics.CompactionMetrics;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
@@ -39,9 +37,6 @@ import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
-
-import com.google.common.util.concurrent.RateLimiter;
import java.io.IOException;
import java.util.ArrayList;
@@ -56,14 +51,12 @@ public class AlignedSeriesCompactionExecutor {
private final LinkedList<Pair<TsFileSequenceReader,
List<AlignedChunkMetadata>>>
readerAndChunkMetadataList;
private final TsFileResource targetResource;
- private final TsFileIOWriter writer;
+ private final CompactionTsFileWriter writer;
private final AlignedChunkWriterImpl chunkWriter;
private final List<IMeasurementSchema> schemaList;
private long remainingPointInChunkWriter = 0L;
private final CompactionTaskSummary summary;
- private final RateLimiter rateLimiter =
- CompactionTaskManager.getInstance().getMergeWriteRateLimiter();
private final long chunkSizeThreshold =
IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
@@ -74,7 +67,7 @@ public class AlignedSeriesCompactionExecutor {
String device,
TsFileResource targetResource,
LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>>
readerAndChunkMetadataList,
- TsFileIOWriter writer,
+ CompactionTsFileWriter writer,
CompactionTaskSummary summary)
throws IOException {
this.device = device;
@@ -101,27 +94,13 @@ public class AlignedSeriesCompactionExecutor {
for (Pair<TsFileSequenceReader, List<AlignedChunkMetadata>> readerListPair
:
readerAndChunkMetadataList) {
TsFileSequenceReader reader = readerListPair.left;
+ if (reader instanceof CompactionTsFileReader) {
+ ((CompactionTsFileReader) reader).markStartOfAlignedSeries();
+ }
List<AlignedChunkMetadata> alignedChunkMetadataList =
readerListPair.right;
- for (AlignedChunkMetadata alignedChunkMetadata :
alignedChunkMetadataList) {
- List<IChunkMetadata> valueChunkMetadataList =
- alignedChunkMetadata.getValueChunkMetadataList();
- for (IChunkMetadata chunkMetadata : valueChunkMetadataList) {
- if (chunkMetadata == null) {
- continue;
- }
- if (measurementSet.contains(chunkMetadata.getMeasurementUid())) {
- continue;
- }
- measurementSet.add(chunkMetadata.getMeasurementUid());
- Chunk chunk = reader.readMemChunk((ChunkMetadata) chunkMetadata);
- ChunkHeader header = chunk.getHeader();
- schemaSet.add(
- new MeasurementSchema(
- header.getMeasurementID(),
- header.getDataType(),
- header.getEncodingType(),
- header.getCompressionType()));
- }
+ collectSchemaFromOneFile(alignedChunkMetadataList, reader, schemaSet,
measurementSet);
+ if (reader instanceof CompactionTsFileReader) {
+ ((CompactionTsFileReader) reader).markEndOfAlignedSeries();
}
}
List<IMeasurementSchema> schemaList = new ArrayList<>(schemaSet);
@@ -129,12 +108,46 @@ public class AlignedSeriesCompactionExecutor {
return schemaList;
}
+ private void collectSchemaFromOneFile(
+ List<AlignedChunkMetadata> alignedChunkMetadataList,
+ TsFileSequenceReader reader,
+ Set<MeasurementSchema> schemaSet,
+ Set<String> measurementSet)
+ throws IOException {
+ for (AlignedChunkMetadata alignedChunkMetadata : alignedChunkMetadataList)
{
+ List<IChunkMetadata> valueChunkMetadataList =
+ alignedChunkMetadata.getValueChunkMetadataList();
+ for (IChunkMetadata chunkMetadata : valueChunkMetadataList) {
+ if (chunkMetadata == null) {
+ continue;
+ }
+ if (measurementSet.contains(chunkMetadata.getMeasurementUid())) {
+ continue;
+ }
+ measurementSet.add(chunkMetadata.getMeasurementUid());
+ Chunk chunk = reader.readMemChunk((ChunkMetadata) chunkMetadata);
+ ChunkHeader header = chunk.getHeader();
+ schemaSet.add(
+ new MeasurementSchema(
+ header.getMeasurementID(),
+ header.getDataType(),
+ header.getEncodingType(),
+ header.getCompressionType()));
+ }
+ }
+ }
+
public void execute() throws IOException {
while (readerAndChunkMetadataList.size() > 0) {
Pair<TsFileSequenceReader, List<AlignedChunkMetadata>> readerListPair =
readerAndChunkMetadataList.removeFirst();
TsFileSequenceReader reader = readerListPair.left;
List<AlignedChunkMetadata> alignedChunkMetadataList =
readerListPair.right;
+
+ if (reader instanceof CompactionTsFileReader) {
+ ((CompactionTsFileReader) reader).markStartOfAlignedSeries();
+ }
+
TsFileAlignedSeriesReaderIterator readerIterator =
new TsFileAlignedSeriesReaderIterator(reader,
alignedChunkMetadataList, schemaList);
while (readerIterator.hasNext()) {
@@ -142,22 +155,16 @@ public class AlignedSeriesCompactionExecutor {
readerIterator.nextReader();
summary.increaseProcessChunkNum(nextAlignedChunkInfo.getNotNullChunkNum());
summary.increaseProcessPointNum(nextAlignedChunkInfo.getTotalPointNum());
-
CompactionMetrics.getInstance().recordReadInfo(nextAlignedChunkInfo.getTotalSize());
compactOneAlignedChunk(
nextAlignedChunkInfo.getReader(),
nextAlignedChunkInfo.getNotNullChunkNum());
}
+ if (reader instanceof CompactionTsFileReader) {
+ ((CompactionTsFileReader) reader).markEndOfAlignedSeries();
+ }
}
if (remainingPointInChunkWriter != 0L) {
- CompactionTaskManager.mergeRateLimiterAcquire(
- rateLimiter, chunkWriter.estimateMaxSeriesMemSize());
- CompactionMetrics.getInstance()
- .recordWriteInfo(
- CompactionType.INNER_SEQ_COMPACTION,
- ProcessChunkType.DESERIALIZE_CHUNK,
- true,
- chunkWriter.estimateMaxSeriesMemSize());
- chunkWriter.writeToFileWriter(writer);
+ writer.writeChunk(chunkWriter);
}
writer.checkMetadataSizeAndMayFlush();
}
@@ -191,15 +198,7 @@ public class AlignedSeriesCompactionExecutor {
private void flushChunkWriterIfLargeEnough() throws IOException {
if (remainingPointInChunkWriter >= chunkPointNumThreshold
|| chunkWriter.estimateMaxSeriesMemSize() >= chunkSizeThreshold *
schemaList.size()) {
- CompactionTaskManager.mergeRateLimiterAcquire(
- rateLimiter, chunkWriter.estimateMaxSeriesMemSize());
- CompactionMetrics.getInstance()
- .recordWriteInfo(
- CompactionType.INNER_SEQ_COMPACTION,
- ProcessChunkType.DESERIALIZE_CHUNK,
- true,
- chunkWriter.estimateMaxSeriesMemSize());
- chunkWriter.writeToFileWriter(writer);
+ writer.writeChunk(chunkWriter);
remainingPointInChunkWriter = 0L;
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
index 47d80ccf4ea..0c695eac833 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
@@ -22,11 +22,8 @@ package
org.apache.iotdb.db.engine.compaction.execute.utils.executor.readchunk;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import
org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary;
-import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
-import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
-import
org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType;
+import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileWriter;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.service.metrics.CompactionMetrics;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.read.TimeValuePair;
@@ -39,9 +36,6 @@ import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
-
-import com.google.common.util.concurrent.RateLimiter;
import java.io.IOException;
import java.util.LinkedList;
@@ -52,15 +46,13 @@ public class SingleSeriesCompactionExecutor {
private String device;
private PartialPath series;
private LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>>
readerAndChunkMetadataList;
- private TsFileIOWriter fileWriter;
+ private CompactionTsFileWriter fileWriter;
private TsFileResource targetResource;
private IMeasurementSchema schema;
private ChunkWriterImpl chunkWriter;
private Chunk cachedChunk;
private ChunkMetadata cachedChunkMetadata;
- private RateLimiter compactionRateLimiter =
- CompactionTaskManager.getInstance().getMergeWriteRateLimiter();
// record the min time and max time to update the target resource
private long minStartTimestamp = Long.MAX_VALUE;
private long maxEndTimestamp = Long.MIN_VALUE;
@@ -80,7 +72,7 @@ public class SingleSeriesCompactionExecutor {
PartialPath series,
IMeasurementSchema measurementSchema,
LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>>
readerAndChunkMetadataList,
- TsFileIOWriter fileWriter,
+ CompactionTsFileWriter fileWriter,
TsFileResource targetResource) {
this.device = series.getDevice();
this.series = series;
@@ -97,7 +89,7 @@ public class SingleSeriesCompactionExecutor {
public SingleSeriesCompactionExecutor(
PartialPath series,
LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>>
readerAndChunkMetadataList,
- TsFileIOWriter fileWriter,
+ CompactionTsFileWriter fileWriter,
TsFileResource targetResource,
CompactionTaskSummary summary) {
this.device = series.getDevice();
@@ -129,10 +121,6 @@ public class SingleSeriesCompactionExecutor {
if (this.chunkWriter == null) {
constructChunkWriterFromReadChunk(currentChunk);
}
- CompactionMetrics.getInstance()
- .recordReadInfo(
- (long) currentChunk.getHeader().getSerializedSize()
- + currentChunk.getHeader().getDataSize());
// if this chunk is modified, deserialize it into points
if (chunkMetadata.getDeleteIntervalList() != null) {
@@ -155,7 +143,7 @@ public class SingleSeriesCompactionExecutor {
// after all the chunk of this sensor is read, flush the remaining data
if (cachedChunk != null) {
- flushChunkToFileWriter(cachedChunk, cachedChunkMetadata, true);
+ flushChunkToFileWriter(cachedChunk, cachedChunkMetadata);
cachedChunk = null;
cachedChunkMetadata = null;
} else if (pointCountInChunkWriter != 0L) {
@@ -211,7 +199,7 @@ public class SingleSeriesCompactionExecutor {
// there is no points remaining in ChunkWriter and no cached chunk
// flush it to file directly
summary.increaseDirectlyFlushChunkNum(1);
- flushChunkToFileWriter(chunk, chunkMetadata, false);
+ flushChunkToFileWriter(chunk, chunkMetadata);
}
}
@@ -327,36 +315,20 @@ public class SingleSeriesCompactionExecutor {
}
}
- private void flushChunkToFileWriter(
- Chunk chunk, ChunkMetadata chunkMetadata, boolean isCachedChunk) throws
IOException {
- CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter,
getChunkSize(chunk));
+ private void flushChunkToFileWriter(Chunk chunk, ChunkMetadata
chunkMetadata) throws IOException {
if (chunkMetadata.getStartTime() < minStartTimestamp) {
minStartTimestamp = chunkMetadata.getStartTime();
}
if (chunkMetadata.getEndTime() > maxEndTimestamp) {
maxEndTimestamp = chunkMetadata.getEndTime();
}
- CompactionMetrics.getInstance()
- .recordWriteInfo(
- CompactionType.INNER_SEQ_COMPACTION,
- isCachedChunk ? ProcessChunkType.MERGE_CHUNK :
ProcessChunkType.FLUSH_CHUNK,
- false,
- getChunkSize(chunk));
fileWriter.writeChunk(chunk, chunkMetadata);
}
private void flushChunkWriterIfLargeEnough() throws IOException {
if (pointCountInChunkWriter >= targetChunkPointNum
|| chunkWriter.estimateMaxSeriesMemSize() >= targetChunkSize) {
- CompactionTaskManager.mergeRateLimiterAcquire(
- compactionRateLimiter, chunkWriter.estimateMaxSeriesMemSize());
- CompactionMetrics.getInstance()
- .recordWriteInfo(
- CompactionType.INNER_SEQ_COMPACTION,
- ProcessChunkType.DESERIALIZE_CHUNK,
- false,
- chunkWriter.estimateMaxSeriesMemSize());
- chunkWriter.writeToFileWriter(fileWriter);
+ fileWriter.writeChunk(chunkWriter);
pointCountInChunkWriter = 0L;
}
}
@@ -364,22 +336,14 @@ public class SingleSeriesCompactionExecutor {
private void flushCachedChunkIfLargeEnough() throws IOException {
if (cachedChunk.getChunkStatistic().getCount() >= targetChunkPointNum
|| getChunkSize(cachedChunk) >= targetChunkSize) {
- flushChunkToFileWriter(cachedChunk, cachedChunkMetadata, true);
+ flushChunkToFileWriter(cachedChunk, cachedChunkMetadata);
cachedChunk = null;
cachedChunkMetadata = null;
}
}
private void flushChunkWriter() throws IOException {
- CompactionTaskManager.mergeRateLimiterAcquire(
- compactionRateLimiter, chunkWriter.estimateMaxSeriesMemSize());
- CompactionMetrics.getInstance()
- .recordWriteInfo(
- CompactionType.INNER_SEQ_COMPACTION,
- ProcessChunkType.DESERIALIZE_CHUNK,
- false,
- chunkWriter.estimateMaxSeriesMemSize());
- chunkWriter.writeToFileWriter(fileWriter);
+ fileWriter.writeChunk(chunkWriter);
pointCountInChunkWriter = 0L;
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java
index 1f4b13b9a14..7b7fbc2ee1a 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java
@@ -19,10 +19,7 @@
package org.apache.iotdb.db.engine.compaction.execute.utils.writer;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
-import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
-import
org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType;
-import org.apache.iotdb.db.service.metrics.CompactionMetrics;
+import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileWriter;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.exception.write.PageException;
import org.apache.iotdb.tsfile.file.header.PageHeader;
@@ -39,9 +36,6 @@ import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.apache.iotdb.tsfile.write.chunk.ValueChunkWriter;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
-
-import com.google.common.util.concurrent.RateLimiter;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -50,9 +44,6 @@ import java.util.List;
public abstract class AbstractCompactionWriter implements AutoCloseable {
protected int subTaskNum =
IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum();
- private RateLimiter compactionRateLimiter =
- CompactionTaskManager.getInstance().getMergeWriteRateLimiter();
-
// check if there is unseq error point during writing
protected long[] lastTime = new long[subTaskNum];
@@ -135,43 +126,43 @@ public abstract class AbstractCompactionWriter implements
AutoCloseable {
*/
public abstract void checkAndMayFlushChunkMetadata() throws IOException;
- protected void writeDataPoint(long timestamp, TsPrimitiveType value,
IChunkWriter iChunkWriter) {
- if (iChunkWriter instanceof ChunkWriterImpl) {
- ChunkWriterImpl chunkWriter = (ChunkWriterImpl) iChunkWriter;
- switch (chunkWriter.getDataType()) {
+ protected void writeDataPoint(long timestamp, TsPrimitiveType value,
IChunkWriter chunkWriter) {
+ if (chunkWriter instanceof ChunkWriterImpl) {
+ ChunkWriterImpl chunkWriterImpl = (ChunkWriterImpl) chunkWriter;
+ switch (chunkWriterImpl.getDataType()) {
case TEXT:
- chunkWriter.write(timestamp, value.getBinary());
+ chunkWriterImpl.write(timestamp, value.getBinary());
break;
case DOUBLE:
- chunkWriter.write(timestamp, value.getDouble());
+ chunkWriterImpl.write(timestamp, value.getDouble());
break;
case BOOLEAN:
- chunkWriter.write(timestamp, value.getBoolean());
+ chunkWriterImpl.write(timestamp, value.getBoolean());
break;
case INT64:
- chunkWriter.write(timestamp, value.getLong());
+ chunkWriterImpl.write(timestamp, value.getLong());
break;
case INT32:
- chunkWriter.write(timestamp, value.getInt());
+ chunkWriterImpl.write(timestamp, value.getInt());
break;
case FLOAT:
- chunkWriter.write(timestamp, value.getFloat());
+ chunkWriterImpl.write(timestamp, value.getFloat());
break;
default:
- throw new UnsupportedOperationException("Unknown data type " +
chunkWriter.getDataType());
+ throw new UnsupportedOperationException(
+ "Unknown data type " + chunkWriterImpl.getDataType());
}
} else {
- AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl)
iChunkWriter;
+ AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl)
chunkWriter;
alignedChunkWriter.write(timestamp, value.getVector());
}
}
- protected void sealChunk(TsFileIOWriter targetWriter, IChunkWriter
iChunkWriter, int subTaskId)
+ protected void sealChunk(
+ CompactionTsFileWriter targetWriter, IChunkWriter chunkWriter, int
subTaskId)
throws IOException {
- CompactionTaskManager.mergeRateLimiterAcquire(
- compactionRateLimiter, iChunkWriter.estimateMaxSeriesMemSize());
synchronized (targetWriter) {
- iChunkWriter.writeToFileWriter(targetWriter);
+ targetWriter.writeChunk(chunkWriter);
}
chunkPointNumArray[subTaskId] = 0;
}
@@ -188,19 +179,18 @@ public abstract class AbstractCompactionWriter implements
AutoCloseable {
throws IOException;
protected void flushNonAlignedChunkToFileWriter(
- TsFileIOWriter targetWriter, Chunk chunk, ChunkMetadata chunkMetadata,
int subTaskId)
+ CompactionTsFileWriter targetWriter, Chunk chunk, ChunkMetadata
chunkMetadata, int subTaskId)
throws IOException {
- CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter,
getChunkSize(chunk));
synchronized (targetWriter) {
// seal last chunk to file writer
- chunkWriters[subTaskId].writeToFileWriter(targetWriter);
+ targetWriter.writeChunk(chunkWriters[subTaskId]);
chunkPointNumArray[subTaskId] = 0;
targetWriter.writeChunk(chunk, chunkMetadata);
}
}
protected void flushAlignedChunkToFileWriter(
- TsFileIOWriter targetWriter,
+ CompactionTsFileWriter targetWriter,
Chunk timeChunk,
IChunkMetadata timeChunkMetadata,
List<Chunk> valueChunks,
@@ -210,11 +200,12 @@ public abstract class AbstractCompactionWriter implements
AutoCloseable {
synchronized (targetWriter) {
AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl)
chunkWriters[subTaskId];
// seal last chunk to file writer
- alignedChunkWriter.writeToFileWriter(targetWriter);
+ targetWriter.writeChunk(alignedChunkWriter);
chunkPointNumArray[subTaskId] = 0;
+ targetWriter.markStartingWritingAligned();
+
// flush time chunk
- CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter,
getChunkSize(timeChunk));
targetWriter.writeChunk(timeChunk, (ChunkMetadata) timeChunkMetadata);
// flush value chunks
@@ -231,10 +222,10 @@ public abstract class AbstractCompactionWriter implements
AutoCloseable {
Statistics.getStatsByType(valueChunkWriter.getDataType()));
continue;
}
- CompactionTaskManager.mergeRateLimiterAcquire(
- compactionRateLimiter, getChunkSize(valueChunk));
targetWriter.writeChunk(valueChunk, (ChunkMetadata)
valueChunkMetadatas.get(i));
}
+
+ targetWriter.markEndingWritingAligned();
}
}
@@ -292,22 +283,14 @@ public abstract class AbstractCompactionWriter implements
AutoCloseable {
}
protected void checkChunkSizeAndMayOpenANewChunk(
- TsFileIOWriter fileWriter, IChunkWriter iChunkWriter, int subTaskId,
boolean isCrossSpace)
+ CompactionTsFileWriter fileWriter, IChunkWriter chunkWriter, int
subTaskId)
throws IOException {
if (chunkPointNumArray[subTaskId] >= (lastCheckIndex + 1) * checkPoint) {
// if chunk point num reaches the check point, then check if the chunk
size over threshold
lastCheckIndex = chunkPointNumArray[subTaskId] / checkPoint;
- if (iChunkWriter.checkIsChunkSizeOverThreshold(targetChunkSize,
targetChunkPointNum, false)) {
- sealChunk(fileWriter, iChunkWriter, subTaskId);
+ if (chunkWriter.checkIsChunkSizeOverThreshold(targetChunkSize,
targetChunkPointNum, false)) {
+ sealChunk(fileWriter, chunkWriter, subTaskId);
lastCheckIndex = 0;
- CompactionMetrics.getInstance()
- .recordWriteInfo(
- isCrossSpace
- ? CompactionType.CROSS_COMPACTION
- : CompactionType.INNER_UNSEQ_COMPACTION,
- ProcessChunkType.DESERIALIZE_CHUNK,
- isAlign,
- iChunkWriter.estimateMaxSeriesMemSize());
}
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java
index 44409808daf..277379cb9dd 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java
@@ -21,6 +21,8 @@ package
org.apache.iotdb.db.engine.compaction.execute.utils.writer;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.execute.utils.CompactionUtils;
+import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileWriter;
+import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
@@ -29,7 +31,6 @@ import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
-import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import java.io.IOException;
import java.util.ArrayList;
@@ -39,7 +40,7 @@ import java.util.Map;
public abstract class AbstractCrossCompactionWriter extends
AbstractCompactionWriter {
// target fileIOWriters
- protected List<TsFileIOWriter> targetFileWriters = new ArrayList<>();
+ protected List<CompactionTsFileWriter> targetFileWriters = new ArrayList<>();
// source tsfiles
private List<TsFileResource> seqTsFileResources;
@@ -77,8 +78,11 @@ public abstract class AbstractCrossCompactionWriter extends
AbstractCompactionWr
boolean enableMemoryControl =
IoTDBDescriptor.getInstance().getConfig().isEnableMemControl();
for (int i = 0; i < targetResources.size(); i++) {
this.targetFileWriters.add(
- new TsFileIOWriter(
- targetResources.get(i).getTsFile(), enableMemoryControl,
memorySizeForEachWriter));
+ new CompactionTsFileWriter(
+ targetResources.get(i).getTsFile(),
+ enableMemoryControl,
+ memorySizeForEachWriter,
+ CompactionType.CROSS_COMPACTION));
isEmptyFile[i] = true;
}
this.seqTsFileResources = seqFileResources;
@@ -99,7 +103,7 @@ public abstract class AbstractCrossCompactionWriter extends
AbstractCompactionWr
@Override
public void endChunkGroup() throws IOException {
for (int i = 0; i < seqTsFileResources.size(); i++) {
- TsFileIOWriter targetFileWriter = targetFileWriters.get(i);
+ CompactionTsFileWriter targetFileWriter = targetFileWriters.get(i);
if (isDeviceExistedInTargetFiles[i]) {
// update resource
CompactionUtils.updateResource(targetResources.get(i),
targetFileWriter, deviceId);
@@ -129,7 +133,7 @@ public abstract class AbstractCrossCompactionWriter extends
AbstractCompactionWr
writeDataPoint(timestamp, value, chunkWriters[subTaskId]);
chunkPointNumArray[subTaskId]++;
checkChunkSizeAndMayOpenANewChunk(
- targetFileWriters.get(fileIndex), chunkWriters[subTaskId], subTaskId,
true);
+ targetFileWriters.get(fileIndex), chunkWriters[subTaskId], subTaskId);
isDeviceExistedInTargetFiles[fileIndex] = true;
isEmptyFile[fileIndex] = false;
lastTime[subTaskId] = timestamp;
@@ -153,7 +157,7 @@ public abstract class AbstractCrossCompactionWriter extends
AbstractCompactionWr
@Override
public void close() throws IOException {
- for (TsFileIOWriter targetWriter : targetFileWriters) {
+ for (CompactionTsFileWriter targetWriter : targetFileWriters) {
if (targetWriter != null && targetWriter.canWrite()) {
targetWriter.close();
}
@@ -165,8 +169,8 @@ public abstract class AbstractCrossCompactionWriter extends
AbstractCompactionWr
@Override
public void checkAndMayFlushChunkMetadata() throws IOException {
for (int i = 0; i < targetFileWriters.size(); i++) {
- TsFileIOWriter fileIOWriter = targetFileWriters.get(i);
- fileIOWriter.checkMetadataSizeAndMayFlush();
+ CompactionTsFileWriter fileIoWriter = targetFileWriters.get(i);
+ fileIoWriter.checkMetadataSizeAndMayFlush();
}
}
@@ -234,7 +238,7 @@ public abstract class AbstractCrossCompactionWriter extends
AbstractCompactionWr
@Override
public long getWriterSize() throws IOException {
long totalSize = 0;
- for (TsFileIOWriter writer : targetFileWriters) {
+ for (CompactionTsFileWriter writer : targetFileWriters) {
totalSize += writer.getPos();
}
return totalSize;
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java
index 6404730d0e4..3795cf1d71b 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java
@@ -20,18 +20,19 @@ package
org.apache.iotdb.db.engine.compaction.execute.utils.writer;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.execute.utils.CompactionUtils;
+import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileWriter;
+import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
-import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import java.io.IOException;
public abstract class AbstractInnerCompactionWriter extends
AbstractCompactionWriter {
- protected TsFileIOWriter fileWriter;
+ protected CompactionTsFileWriter fileWriter;
protected boolean isEmptyFile;
@@ -50,7 +51,11 @@ public abstract class AbstractInnerCompactionWriter extends
AbstractCompactionWr
*
IoTDBDescriptor.getInstance().getConfig().getChunkMetadataSizeProportion());
boolean enableMemoryControl =
IoTDBDescriptor.getInstance().getConfig().isEnableMemControl();
this.fileWriter =
- new TsFileIOWriter(targetFileResource.getTsFile(),
enableMemoryControl, sizeForFileWriter);
+ new CompactionTsFileWriter(
+ targetFileResource.getTsFile(),
+ enableMemoryControl,
+ sizeForFileWriter,
+ CompactionType.INNER_UNSEQ_COMPACTION);
this.targetResource = targetFileResource;
isEmptyFile = true;
}
@@ -77,7 +82,7 @@ public abstract class AbstractInnerCompactionWriter extends
AbstractCompactionWr
public void write(TimeValuePair timeValuePair, int subTaskId) throws
IOException {
writeDataPoint(timeValuePair.getTimestamp(), timeValuePair.getValue(),
chunkWriters[subTaskId]);
chunkPointNumArray[subTaskId]++;
- checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriters[subTaskId],
subTaskId, false);
+ checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriters[subTaskId],
subTaskId);
isEmptyFile = false;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/FastCrossCompactionWriter.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/FastCrossCompactionWriter.java
index 109d66c03b3..b3464080e76 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/FastCrossCompactionWriter.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/FastCrossCompactionWriter.java
@@ -150,7 +150,7 @@ public class FastCrossCompactionWriter extends
AbstractCrossCompactionWriter {
// check chunk size and may open a new chunk
checkChunkSizeAndMayOpenANewChunk(
- targetFileWriters.get(fileIndex), chunkWriters[subTaskId], subTaskId,
true);
+ targetFileWriters.get(fileIndex), chunkWriters[subTaskId], subTaskId);
isDeviceExistedInTargetFiles[fileIndex] = true;
isEmptyFile[fileIndex] = false;
@@ -178,7 +178,7 @@ public class FastCrossCompactionWriter extends
AbstractCrossCompactionWriter {
// check chunk size and may open a new chunk
checkChunkSizeAndMayOpenANewChunk(
- targetFileWriters.get(fileIndex), chunkWriters[subTaskId], subTaskId,
true);
+ targetFileWriters.get(fileIndex), chunkWriters[subTaskId], subTaskId);
isDeviceExistedInTargetFiles[fileIndex] = true;
isEmptyFile[fileIndex] = false;
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java
index 772a5b18bd9..497afa3d32e 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java
@@ -57,7 +57,7 @@ public class ReadPointCrossCompactionWriter extends
AbstractCrossCompactionWrite
}
chunkPointNumArray[subTaskId] += timestamps.getTimes().length;
checkChunkSizeAndMayOpenANewChunk(
- targetFileWriters.get(seqFileIndexArray[subTaskId]), chunkWriter,
subTaskId, true);
+ targetFileWriters.get(seqFileIndexArray[subTaskId]), chunkWriter,
subTaskId);
isDeviceExistedInTargetFiles[seqFileIndexArray[subTaskId]] = true;
isEmptyFile[seqFileIndexArray[subTaskId]] = false;
lastTime[subTaskId] = timestamps.getEndTime();
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/ReadPointInnerCompactionWriter.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/ReadPointInnerCompactionWriter.java
index 0fdb712084d..f0dd4eef1f9 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/ReadPointInnerCompactionWriter.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/ReadPointInnerCompactionWriter.java
@@ -42,7 +42,7 @@ public class ReadPointInnerCompactionWriter extends
AbstractInnerCompactionWrite
AlignedChunkWriterImpl chunkWriter = (AlignedChunkWriterImpl)
this.chunkWriters[subTaskId];
chunkWriter.write(timestamps, columns, batchSize);
chunkPointNumArray[subTaskId] += timestamps.getTimes().length;
- checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriter, subTaskId,
false);
+ checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriter, subTaskId);
isEmptyFile = false;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java
new file mode 100644
index 00000000000..6e8b7b3d890
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.compaction.io;
+
+import
org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionIoDataType;
+import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
+import org.apache.iotdb.db.service.metrics.CompactionMetrics;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.read.TsFileDeviceIterator;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * This class extends the TsFileSequenceReader class to read and manage TsFile
with a focus on
+ * compaction-related operations. This includes functions for tracking and
recording the amount of
+ * data read and distinguishing between aligned and not aligned series during
compaction.
+ */
+public class CompactionTsFileReader extends TsFileSequenceReader {
+ /** Tracks the total amount of data (in bytes) that has been read. */
+ private AtomicLong readDataSize = new AtomicLong(0L);
+
+ /** The type of compaction running. */
+ CompactionType compactionType;
+
+ /** A flag that indicates if an aligned series is being read. */
+ private volatile boolean readingAlignedSeries = false;
+
+ /**
+ * Constructs a new instance of CompactionTsFileReader.
+ *
+ * @param file The file to be read.
+ * @param compactionType The type of compaction running.
+ * @throws IOException If an error occurs during file operations.
+ */
+ public CompactionTsFileReader(String file, CompactionType compactionType)
throws IOException {
+ super(file);
+ this.compactionType = compactionType;
+ }
+
+ @Override
+ protected ByteBuffer readData(long position, int totalSize) throws
IOException {
+ ByteBuffer buffer = super.readData(position, totalSize);
+ readDataSize.addAndGet(totalSize);
+ return buffer;
+ }
+
+ /** Marks the start of reading an aligned series. */
+ public void markStartOfAlignedSeries() {
+ readingAlignedSeries = true;
+ }
+
+ /** Marks the end of reading an aligned series. */
+ public void markEndOfAlignedSeries() {
+ readingAlignedSeries = false;
+ }
+
+ @Override
+ public Chunk readMemChunk(ChunkMetadata metaData) throws IOException {
+ synchronized (this) {
+ // using synchronized to avoid concurrent read that makes readDataSize
not correct
+ long before = readDataSize.get();
+ Chunk chunk = super.readMemChunk(metaData);
+ long dataSize = readDataSize.get() - before;
+ CompactionMetrics.getInstance()
+ .recordReadInfo(
+ compactionType,
+ readingAlignedSeries
+ ? CompactionIoDataType.ALIGNED
+ : CompactionIoDataType.NOT_ALIGNED,
+ dataSize);
+ return chunk;
+ }
+ }
+
+ @Override
+ public TsFileDeviceIterator getAllDevicesIteratorWithIsAligned() throws
IOException {
+ long before = readDataSize.get();
+ TsFileDeviceIterator iterator = super.getAllDevicesIteratorWithIsAligned();
+ long dataSize = readDataSize.get() - before;
+ CompactionMetrics.getInstance()
+ .recordReadInfo(compactionType, CompactionIoDataType.METADATA,
dataSize);
+ return iterator;
+ }
+
+ @Override
+ public List<IChunkMetadata> getChunkMetadataListByTimeseriesMetadataOffset(
+ long startOffset, long endOffset) throws IOException {
+ long before = readDataSize.get();
+ List<IChunkMetadata> chunkMetadataList =
+ super.getChunkMetadataListByTimeseriesMetadataOffset(startOffset,
endOffset);
+ long dataSize = readDataSize.get() - before;
+ CompactionMetrics.getInstance()
+ .recordReadInfo(compactionType, CompactionIoDataType.METADATA,
dataSize);
+ return chunkMetadataList;
+ }
+
+ @Override
+ public void getDevicesAndEntriesOfOneLeafNode(
+ Long startOffset, Long endOffset, Queue<Pair<String, long[]>>
measurementNodeOffsetQueue)
+ throws IOException {
+ long before = readDataSize.get();
+ super.getDevicesAndEntriesOfOneLeafNode(startOffset, endOffset,
measurementNodeOffsetQueue);
+ long dataSize = readDataSize.get() - before;
+ CompactionMetrics.getInstance()
+ .recordReadInfo(compactionType, CompactionIoDataType.METADATA,
dataSize);
+ }
+
+ @Override
+ public MetadataIndexNode readMetadataIndexNode(long start, long end) throws
IOException {
+ long before = readDataSize.get();
+ MetadataIndexNode metadataIndexNode = super.readMetadataIndexNode(start,
end);
+ long dataSize = readDataSize.get() - before;
+ CompactionMetrics.getInstance()
+ .recordReadInfo(compactionType, CompactionIoDataType.METADATA,
dataSize);
+ return metadataIndexNode;
+ }
+
+ @Override
+ public Map<String, Pair<List<IChunkMetadata>, Pair<Long, Long>>>
+ getTimeseriesMetadataOffsetByDevice(
+ MetadataIndexNode measurementNode,
+ Set<String> excludedMeasurementIds,
+ boolean needChunkMetadata)
+ throws IOException {
+ long before = readDataSize.get();
+ Map<String, Pair<List<IChunkMetadata>, Pair<Long, Long>>> result =
+ super.getTimeseriesMetadataOffsetByDevice(
+ measurementNode, excludedMeasurementIds, needChunkMetadata);
+ long dataSize = readDataSize.get() - before;
+ CompactionMetrics.getInstance()
+ .recordReadInfo(compactionType, CompactionIoDataType.METADATA,
dataSize);
+ return result;
+ }
+
+ @Override
+ public void getDeviceTimeseriesMetadata(
+ List<TimeseriesMetadata> timeseriesMetadataList,
+ MetadataIndexNode measurementNode,
+ Set<String> excludedMeasurementIds,
+ boolean needChunkMetadata)
+ throws IOException {
+ long before = readDataSize.get();
+ super.getDeviceTimeseriesMetadata(
+ timeseriesMetadataList, measurementNode, excludedMeasurementIds,
needChunkMetadata);
+ long dataSize = readDataSize.get() - before;
+ CompactionMetrics.getInstance()
+ .recordReadInfo(compactionType, CompactionIoDataType.METADATA,
dataSize);
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java
new file mode 100644
index 00000000000..4dac7843aae
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.compaction.io;
+
+import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
+import
org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionIoDataType;
+import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
+import org.apache.iotdb.db.service.metrics.CompactionMetrics;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl;
+import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+
+public class CompactionTsFileWriter extends TsFileIOWriter {
+ CompactionType type;
+
+ private volatile boolean isWritingAligned = false;
+
+ public CompactionTsFileWriter(
+ File file, boolean enableMemoryControl, long maxMetadataSize,
CompactionType type)
+ throws IOException {
+ super(file, enableMemoryControl, maxMetadataSize);
+ this.type = type;
+ }
+
+ public void markStartingWritingAligned() {
+ isWritingAligned = true;
+ }
+
+ public void markEndingWritingAligned() {
+ isWritingAligned = false;
+ }
+
+ public void writeChunk(IChunkWriter chunkWriter) throws IOException {
+ boolean isAligned = chunkWriter instanceof AlignedChunkWriterImpl;
+ long beforeOffset = this.getPos();
+ chunkWriter.writeToFileWriter(this);
+ long writtenDataSize = this.getPos() - beforeOffset;
+ if (writtenDataSize > 0) {
+
CompactionTaskManager.getInstance().getMergeWriteRateLimiter().acquire((int)
writtenDataSize);
+ }
+ CompactionMetrics.getInstance()
+ .recordWriteInfo(
+ type,
+ isAligned ? CompactionIoDataType.ALIGNED :
CompactionIoDataType.NOT_ALIGNED,
+ writtenDataSize);
+ }
+
+ @Override
+ public void writeChunk(Chunk chunk, ChunkMetadata chunkMetadata) throws
IOException {
+ long beforeOffset = this.getPos();
+ super.writeChunk(chunk, chunkMetadata);
+ long writtenDataSize = this.getPos() - beforeOffset;
+ if (writtenDataSize > 0) {
+
CompactionTaskManager.getInstance().getMergeWriteRateLimiter().acquire((int)
writtenDataSize);
+ }
+ CompactionMetrics.getInstance()
+ .recordWriteInfo(
+ type,
+ isWritingAligned ? CompactionIoDataType.ALIGNED :
CompactionIoDataType.NOT_ALIGNED,
+ writtenDataSize);
+ }
+
+ @Override
+ public void writeEmptyValueChunk(
+ String measurementId,
+ CompressionType compressionType,
+ TSDataType tsDataType,
+ TSEncoding encodingType,
+ Statistics<? extends Serializable> statistics)
+ throws IOException {
+ long beforeOffset = this.getPos();
+ super.writeEmptyValueChunk(
+ measurementId, compressionType, tsDataType, encodingType, statistics);
+ long writtenDataSize = this.getPos() - beforeOffset;
+ CompactionMetrics.getInstance()
+ .recordWriteInfo(type, CompactionIoDataType.ALIGNED, writtenDataSize);
+ if (writtenDataSize > 0) {
+
CompactionTaskManager.getInstance().getMergeWriteRateLimiter().acquire((int)
writtenDataSize);
+ }
+ }
+
+ @Override
+ public int checkMetadataSizeAndMayFlush() throws IOException {
+ int size = super.checkMetadataSizeAndMayFlush();
+ if (size > 0) {
+
CompactionTaskManager.getInstance().getMergeWriteRateLimiter().acquire(size);
+ }
+ CompactionMetrics.getInstance().recordWriteInfo(type,
CompactionIoDataType.METADATA, size);
+ return size;
+ }
+
+ @Override
+ public void endFile() throws IOException {
+ long beforeSize = this.getPos();
+ super.endFile();
+ long writtenDataSize = this.getPos() - beforeSize;
+ if (writtenDataSize > 0) {
+
CompactionTaskManager.getInstance().getMergeWriteRateLimiter().acquire((int)
writtenDataSize);
+ }
+ CompactionMetrics.getInstance()
+ .recordWriteInfo(type, CompactionIoDataType.METADATA, writtenDataSize);
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
index 6dd13065941..baad46f2a0d 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
@@ -252,16 +252,6 @@ public class CompactionTaskManager implements IService {
mergeWriteRateLimiter.setRate(throughout);
}
}
- /** wait by throughoutMbPerSec limit to avoid continuous Write Or Read */
- public static void mergeRateLimiterAcquire(RateLimiter limiter, long
bytesLength) {
- while (bytesLength >= Integer.MAX_VALUE) {
- limiter.acquire(Integer.MAX_VALUE);
- bytesLength -= Integer.MAX_VALUE;
- }
- if (bytesLength > 0) {
- limiter.acquire((int) bytesLength);
- }
- }
public synchronized void removeRunningTaskFuture(AbstractCompactionTask
task) {
String regionWithSG = getSGWithRegionId(task.getStorageGroupName(),
task.getDataRegionId());
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/DataNodeThreadModule.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/constant/CompactionIoDataType.java
similarity index 74%
copy from
node-commons/src/main/java/org/apache/iotdb/commons/concurrent/DataNodeThreadModule.java
copy to
server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/constant/CompactionIoDataType.java
index f0d61466497..be5f76eb290 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/DataNodeThreadModule.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/constant/CompactionIoDataType.java
@@ -17,23 +17,20 @@
* under the License.
*/
-package org.apache.iotdb.commons.concurrent;
+package org.apache.iotdb.db.engine.compaction.schedule.constant;
-public enum DataNodeThreadModule {
- QUERY,
- MPP,
- COMPACTION,
- WAL,
- FLUSH,
- SCHEMA_ENGINE,
- CLIENT_SERVICE,
- IOT_CONSENSUS,
- RATIS_CONSENSUS,
- COMPUTE,
- SYNC,
- JVM,
- LOG_BACK,
- METRICS,
- OTHER,
- UNKNOWN
+public enum CompactionIoDataType {
+ NOT_ALIGNED(0),
+ ALIGNED(1),
+ METADATA(2);
+
+ int value;
+
+ CompactionIoDataType(int value) {
+ this.value = value;
+ }
+
+ public int getValue() {
+ return value;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index de9a822facf..468619f8a69 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -1550,6 +1550,11 @@ public class DataRegion implements IDataRegionForQuery {
tsFileResourceList.addAll(tsFileManager.getTsFileList(false));
tsFileResourceList.forEach(
x -> {
+ FileMetrics.getInstance()
+ .deleteFile(
+ new long[] {x.getTsFileSize()},
+ x.isSeq(),
+ Collections.singletonList(x.getTsFile().getName()));
if (x.getModFile().exists()) {
FileMetrics.getInstance().decreaseModFileNum(1);
FileMetrics.getInstance().decreaseModFileSize(x.getModFile().getSize());
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java
b/server/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java
index d482be8c9ec..f80ff96e664 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java
@@ -25,8 +25,8 @@ import
org.apache.iotdb.db.engine.compaction.constant.CompactionTaskStatus;
import org.apache.iotdb.db.engine.compaction.constant.CompactionTaskType;
import
org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary;
import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
+import
org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionIoDataType;
import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
-import
org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType;
import org.apache.iotdb.metrics.AbstractMetricService;
import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
import org.apache.iotdb.metrics.metricsets.IMetricSet;
@@ -44,7 +44,10 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class CompactionMetrics implements IMetricSet {
- private static final List<String> TYPES = Arrays.asList("aligned",
"not_aligned");
+ private static final String NOT_ALIGNED = "not_aligned";
+ private static final String ALIGNED = "aligned";
+ private static final String METADATA = "metadata";
+ private static final List<String> TYPES = Arrays.asList(ALIGNED,
NOT_ALIGNED);
private static final CompactionMetrics INSTANCE = new CompactionMetrics();
private long lastUpdateTime = 0L;
private static final long UPDATE_INTERVAL = 10_000L;
@@ -57,47 +60,58 @@ public class CompactionMetrics implements IMetricSet {
private final AtomicInteger finishSeqInnerCompactionTaskNum = new
AtomicInteger(0);
private final AtomicInteger finishUnseqInnerCompactionTaskNum = new
AtomicInteger(0);
private final AtomicInteger finishCrossCompactionTaskNum = new
AtomicInteger(0);
+ // compaction type -> Counter[ Not-Aligned, Aligned, Metadata]
+ private final Map<String, Counter[]> writeCounters = new
ConcurrentHashMap<>();
+ private final Map<String, Counter[]> readCounters = new
ConcurrentHashMap<>();
private CompactionMetrics() {
for (String type : TYPES) {
- Map<CompactionType, Map<ProcessChunkType, Counter>>
compactionTypeProcessChunkTypeMap =
- writeInfoCounterMap.computeIfAbsent(type, k -> new
ConcurrentHashMap<>());
- for (CompactionType compactionType : CompactionType.values()) {
- Map<ProcessChunkType, Counter> counterMap =
- compactionTypeProcessChunkTypeMap.computeIfAbsent(
- compactionType, k -> new ConcurrentHashMap<>());
- for (ProcessChunkType processChunkType : ProcessChunkType.values()) {
- counterMap.put(processChunkType,
DoNothingMetricManager.DO_NOTHING_COUNTER);
- }
- }
+ readCounters.put(
+ type,
+ new Counter[] {
+ DoNothingMetricManager.DO_NOTHING_COUNTER,
+ DoNothingMetricManager.DO_NOTHING_COUNTER,
+ DoNothingMetricManager.DO_NOTHING_COUNTER
+ });
+ writeCounters.put(
+ type,
+ new Counter[] {
+ DoNothingMetricManager.DO_NOTHING_COUNTER,
+ DoNothingMetricManager.DO_NOTHING_COUNTER,
+ DoNothingMetricManager.DO_NOTHING_COUNTER
+ });
}
}
- // region compaction write info
- private Map<String, Map<CompactionType, Map<ProcessChunkType, Counter>>>
writeInfoCounterMap =
- new ConcurrentHashMap<>();
private Counter totalCompactionWriteInfoCounter =
DoNothingMetricManager.DO_NOTHING_COUNTER;
private void bindWriteInfo(AbstractMetricService metricService) {
- for (String type : TYPES) {
- Map<CompactionType, Map<ProcessChunkType, Counter>>
compactionTypeProcessChunkTypeMap =
- writeInfoCounterMap.computeIfAbsent(type, k -> new
ConcurrentHashMap<>());
- for (CompactionType compactionType : CompactionType.values()) {
- Map<ProcessChunkType, Counter> counterMap =
- compactionTypeProcessChunkTypeMap.computeIfAbsent(
- compactionType, k -> new ConcurrentHashMap<>());
- for (ProcessChunkType processChunkType : ProcessChunkType.values()) {
- Counter counter =
- metricService.getOrCreateCounter(
- Metric.DATA_WRITTEN.toString(),
- MetricLevel.IMPORTANT,
- Tag.NAME.toString(),
- "compaction_" + compactionType.toString(),
- Tag.STATUS.toString(),
- type + "_" + processChunkType.toString());
- counterMap.put(processChunkType, counter);
- }
- }
+ for (CompactionType compactionType : CompactionType.values()) {
+ writeCounters.put(
+ compactionType.toString(),
+ new Counter[] {
+ metricService.getOrCreateCounter(
+ Metric.DATA_WRITTEN.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.TYPE.toString(),
+ compactionType.toString(),
+ Tag.NAME.toString(),
+ NOT_ALIGNED),
+ metricService.getOrCreateCounter(
+ Metric.DATA_WRITTEN.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.TYPE.toString(),
+ compactionType.toString(),
+ Tag.NAME.toString(),
+ ALIGNED),
+ metricService.getOrCreateCounter(
+ Metric.DATA_WRITTEN.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.TYPE.toString(),
+ compactionType.toString(),
+ Tag.NAME.toString(),
+ METADATA)
+ });
}
totalCompactionWriteInfoCounter =
metricService.getOrCreateCounter(
@@ -110,22 +124,28 @@ public class CompactionMetrics implements IMetricSet {
}
private void unbindWriteInfo(AbstractMetricService metricService) {
- for (String type : TYPES) {
- for (CompactionType compactionType : CompactionType.values()) {
- for (ProcessChunkType processChunkType : ProcessChunkType.values()) {
- metricService.remove(
- MetricType.COUNTER,
- Metric.DATA_WRITTEN.toString(),
- Tag.NAME.toString(),
- "compaction_" + compactionType.toString(),
- Tag.STATUS.toString(),
- type + "_" + processChunkType.toString());
- writeInfoCounterMap
- .get(type)
- .get(compactionType)
- .put(processChunkType,
DoNothingMetricManager.DO_NOTHING_COUNTER);
- }
- }
+ for (CompactionType compactionType : CompactionType.values()) {
+ metricService.remove(
+ MetricType.COUNTER,
+ Metric.DATA_WRITTEN.toString(),
+ Tag.TYPE.toString(),
+ compactionType.toString(),
+ Tag.NAME.toString(),
+ NOT_ALIGNED);
+ metricService.remove(
+ MetricType.COUNTER,
+ Metric.DATA_WRITTEN.toString(),
+ Tag.TYPE.toString(),
+ compactionType.toString(),
+ Tag.NAME.toString(),
+ ALIGNED);
+ metricService.remove(
+ MetricType.COUNTER,
+ Metric.DATA_WRITTEN.toString(),
+ Tag.TYPE.toString(),
+ compactionType.toString(),
+ Tag.NAME.toString(),
+ METADATA);
}
metricService.remove(
MetricType.COUNTER,
@@ -137,13 +157,12 @@ public class CompactionMetrics implements IMetricSet {
}
public void recordWriteInfo(
- CompactionType compactionType,
- ProcessChunkType processChunkType,
- boolean aligned,
- long byteNum) {
- String type = aligned ? "aligned" : "not_aligned";
-
writeInfoCounterMap.get(type).get(compactionType).get(processChunkType).inc(byteNum
/ 1024L);
- totalCompactionWriteInfoCounter.inc(byteNum / 1024L);
+ CompactionType compactionType, CompactionIoDataType dataType, long
byteNum) {
+ Counter[] counters = writeCounters.get(compactionType.toString());
+ if (counters != null) {
+ counters[dataType.getValue()].inc(byteNum);
+ }
+ totalCompactionWriteInfoCounter.inc(byteNum);
}
// endregion
@@ -152,18 +171,73 @@ public class CompactionMetrics implements IMetricSet {
private Counter totalCompactionReadInfoCounter =
DoNothingMetricManager.DO_NOTHING_COUNTER;
private void bindReadInfo(AbstractMetricService metricService) {
+ for (CompactionType compactionType : CompactionType.values()) {
+ readCounters.put(
+ compactionType.toString(),
+ new Counter[] {
+ metricService.getOrCreateCounter(
+ Metric.DATA_READ.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.TYPE.toString(),
+ compactionType.toString(),
+ Tag.NAME.toString(),
+ NOT_ALIGNED),
+ metricService.getOrCreateCounter(
+ Metric.DATA_READ.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.TYPE.toString(),
+ compactionType.toString(),
+ Tag.NAME.toString(),
+ ALIGNED),
+ metricService.getOrCreateCounter(
+ Metric.DATA_READ.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.TYPE.toString(),
+ compactionType.toString(),
+ Tag.NAME.toString(),
+ METADATA)
+ });
+ }
totalCompactionReadInfoCounter =
metricService.getOrCreateCounter(
Metric.DATA_READ.toString(), MetricLevel.IMPORTANT,
Tag.NAME.toString(), "compaction");
}
private void unbindReadInfo(AbstractMetricService metricService) {
+ for (CompactionType compactionType : CompactionType.values()) {
+ metricService.remove(
+ MetricType.COUNTER,
+ Metric.DATA_READ.toString(),
+ Tag.TYPE.toString(),
+ compactionType.toString(),
+ Tag.NAME.toString(),
+ NOT_ALIGNED);
+ metricService.remove(
+ MetricType.COUNTER,
+ Metric.DATA_READ.toString(),
+ Tag.TYPE.toString(),
+ compactionType.toString(),
+ Tag.NAME.toString(),
+ ALIGNED);
+ metricService.remove(
+ MetricType.COUNTER,
+ Metric.DATA_READ.toString(),
+ Tag.TYPE.toString(),
+ compactionType.toString(),
+ Tag.NAME.toString(),
+ METADATA);
+ }
metricService.remove(
MetricType.COUNTER, Metric.DATA_READ.toString(), Tag.NAME.toString(),
"compaction");
}
- public void recordReadInfo(long byteNum) {
- totalCompactionReadInfoCounter.inc(byteNum / 1024L);
+ public void recordReadInfo(
+ CompactionType compactionType, CompactionIoDataType dataType, long
byteNum) {
+ Counter[] counters = readCounters.get(compactionType.toString());
+ if (counters != null) {
+ counters[dataType.getValue()].inc(byteNum);
+ }
+ totalCompactionReadInfoCounter.inc(byteNum);
}
// endregion
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/metrics/FileMetrics.java
b/server/src/main/java/org/apache/iotdb/db/service/metrics/FileMetrics.java
index 1b82f2a512d..153d0388c2d 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/metrics/FileMetrics.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/metrics/FileMetrics.java
@@ -156,6 +156,7 @@ public class FileMetrics implements IMetricSet {
FileMetrics::getModFileNum,
Tag.NAME.toString(),
"mods");
+ checkIfThereRemainingData();
}
private void bindWalFileMetrics(AbstractMetricService metricService) {
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileDeviceIterator.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileDeviceIterator.java
index 76609747c37..075c8da0eae 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileDeviceIterator.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileDeviceIterator.java
@@ -82,8 +82,7 @@ public class TsFileDeviceIterator implements
Iterator<Pair<String, Boolean>> {
try {
// get the first measurement node of this device, to know if the device
is aligned
this.measurementNode =
- MetadataIndexNode.deserializeFrom(
- reader.readData(startEndPair.right[0], startEndPair.right[1]));
+ reader.readMetadataIndexNode(startEndPair.right[0],
startEndPair.right[1]);
boolean isAligned = reader.isAlignedDevice(measurementNode);
currentDevice = new Pair<>(startEndPair.left, isAligned);
return currentDevice;
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 8e760edaef4..8a8516f7f0b 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -2250,6 +2250,18 @@ public class TsFileSequenceReader implements
AutoCloseable {
}
}
+ /**
+ * Read MetadataIndexNode by start and end offset.
+ *
+ * @param start the start offset of the MetadataIndexNode
+ * @param end the end offset of the MetadataIndexNode
+ * @return MetadataIndexNode
+ * @throws IOException IOException
+ */
+ public MetadataIndexNode readMetadataIndexNode(long start, long end) throws
IOException {
+ return MetadataIndexNode.deserializeFrom(readData(start, end));
+ }
+
@Override
public int hashCode() {
return file.hashCode();
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index e62aeb40c7e..ab4ea6198f9 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -631,7 +631,7 @@ public class TsFileIOWriter implements AutoCloseable {
*
* @throws IOException
*/
- public void checkMetadataSizeAndMayFlush() throws IOException {
+ public int checkMetadataSizeAndMayFlush() throws IOException {
// This function should be called after all data of an aligned device has
been written
if (enableMemoryControl && currentChunkMetadataSize > maxMetadataSize) {
try {
@@ -642,11 +642,13 @@ public class TsFileIOWriter implements AutoCloseable {
chunkMetadataCount,
currentChunkMetadataSize / chunkMetadataCount);
}
- sortAndFlushChunkMetadata();
+ return sortAndFlushChunkMetadata();
} catch (IOException e) {
logger.error("Meets exception when flushing metadata to temp file for
{}", file, e);
throw e;
}
+ } else {
+ return 0;
}
}
@@ -656,7 +658,8 @@ public class TsFileIOWriter implements AutoCloseable {
*
* @throws IOException
*/
- protected void sortAndFlushChunkMetadata() throws IOException {
+ protected int sortAndFlushChunkMetadata() throws IOException {
+ int writtenSize = 0;
// group by series
List<Pair<Path, List<IChunkMetadata>>> sortedChunkMetadataList =
TSMIterator.sortChunkMetadata(
@@ -673,7 +676,7 @@ public class TsFileIOWriter implements AutoCloseable {
pathCount++;
}
List<IChunkMetadata> iChunkMetadataList = pair.right;
- writeChunkMetadataToTempFile(iChunkMetadataList, seriesPath, isNewPath);
+ writtenSize += writeChunkMetadataToTempFile(iChunkMetadataList,
seriesPath, isNewPath);
lastSerializePath = seriesPath;
logger.debug("Flushing {}", seriesPath);
}
@@ -684,11 +687,13 @@ public class TsFileIOWriter implements AutoCloseable {
}
chunkMetadataCount = 0;
currentChunkMetadataSize = 0;
+ return writtenSize;
}
- private void writeChunkMetadataToTempFile(
+ private int writeChunkMetadataToTempFile(
List<IChunkMetadata> iChunkMetadataList, Path seriesPath, boolean
isNewPath)
throws IOException {
+ int writtenSize = 0;
// [DeviceId] measurementId datatype size chunkMetadataBuffer
if (lastSerializePath == null
|| !seriesPath.getDevice().equals(lastSerializePath.getDevice())) {
@@ -696,20 +701,25 @@ public class TsFileIOWriter implements AutoCloseable {
endPosInCMTForDevice.add(tempOutput.getPosition());
// serialize the device
// for each device, we only serialize it once, in order to save io
- ReadWriteIOUtils.write(seriesPath.getDevice(),
tempOutput.wrapAsStream());
+ writtenSize += ReadWriteIOUtils.write(seriesPath.getDevice(),
tempOutput.wrapAsStream());
}
if (isNewPath && iChunkMetadataList.size() > 0) {
// serialize the public info of this measurement
- ReadWriteIOUtils.writeVar(seriesPath.getMeasurement(),
tempOutput.wrapAsStream());
- ReadWriteIOUtils.write(iChunkMetadataList.get(0).getDataType(),
tempOutput.wrapAsStream());
+ writtenSize +=
+ ReadWriteIOUtils.writeVar(seriesPath.getMeasurement(),
tempOutput.wrapAsStream());
+ writtenSize +=
+ ReadWriteIOUtils.write(
+ iChunkMetadataList.get(0).getDataType(),
tempOutput.wrapAsStream());
}
PublicBAOS buffer = new PublicBAOS();
int totalSize = 0;
for (IChunkMetadata chunkMetadata : iChunkMetadataList) {
totalSize += chunkMetadata.serializeTo(buffer, true);
}
- ReadWriteIOUtils.write(totalSize, tempOutput.wrapAsStream());
+ writtenSize += ReadWriteIOUtils.write(totalSize,
tempOutput.wrapAsStream());
buffer.writeTo(tempOutput);
+ writtenSize += buffer.size();
+ return writtenSize;
}
public String getCurrentChunkGroupDeviceId() {